Browse Source

fix code smell

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
7a976f4c89
  1. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java
  2. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfigCache.java
  3. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java
  4. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java
  5. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java
  6. 11
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ThreadPoolManager.java
  7. 11
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java
  8. 5
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
  9. 27
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java
  10. 11
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java

@ -30,6 +30,10 @@ import io.protostuff.runtime.RuntimeSchema;
*/ */
public class ProtoStuffUtils { public class ProtoStuffUtils {
private ProtoStuffUtils() {
throw new IllegalStateException("Utility class");
}
private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>(); private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfigCache.java

@ -24,6 +24,10 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class ConsumerConfigCache { public class ConsumerConfigCache {
private ConsumerConfigCache() {
throw new IllegalStateException("Utility class");
}
private static ConcurrentHashMap<String, ConsumerConfig> consumerMap = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, ConsumerConfig> consumerMap = new ConcurrentHashMap<>();
public static ConsumerConfig getConfigByServersName(String serviceName) { public static ConsumerConfig getConfigByServersName(String serviceName) {

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java

@ -75,11 +75,8 @@ public class ConsumerInterceptor {
request.setClassName(method.getDeclaringClass().getSimpleName()); request.setClassName(method.getDeclaringClass().getSimpleName());
request.setMethodName(method.getName()); request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes()); request.setParameterTypes(method.getParameterTypes());
request.setParameters(args); request.setParameters(args);
String serviceName = method.getDeclaringClass().getName();
return request; return request;
} }

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java

@ -24,6 +24,10 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class RpcRequestTable { public class RpcRequestTable {
private RpcRequestTable() {
throw new IllegalStateException("Utility class");
}
private static ConcurrentHashMap<String, RpcRequestCache> requestMap = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, RpcRequestCache> requestMap = new ConcurrentHashMap<>();
public static void put(String requestId, RpcRequestCache rpcRequestCache) { public static void put(String requestId, RpcRequestCache rpcRequestCache) {

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java

@ -22,6 +22,10 @@ package org.apache.dolphinscheduler.rpc.common;
*/ */
public class ConsumerConfigConstants { public class ConsumerConfigConstants {
private ConsumerConfigConstants() {
throw new IllegalStateException("Utility class");
}
public static final Boolean DEFAULT_SYNC = false; public static final Boolean DEFAULT_SYNC = false;
public static final Integer DEFAULT_RETRIES = 3; public static final Integer DEFAULT_RETRIES = 3;

11
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ThreadPoolManager.java

@ -29,13 +29,12 @@ public enum ThreadPoolManager {
ExecutorService executorService; ExecutorService executorService;
private static final int WORK_QUEUE_SIZE = 200;
private static final long KEEP_ALIVE_TIME = 60;
ThreadPoolManager() { ThreadPoolManager() {
int SIZE_WORK_QUEUE = 200; executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 4, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
long KEEP_ALIVE_TIME = 60; new ArrayBlockingQueue<>(WORK_QUEUE_SIZE),
int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
int MAXI_MUM_POOL_SIZE = CORE_POOL_SIZE * 4;
executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXI_MUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(SIZE_WORK_QUEUE),
new DiscardPolicy()); new DiscardPolicy());
} }

11
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java

@ -36,10 +36,14 @@ public class ServiceBean {
private static final Logger logger = LoggerFactory.getLogger(ServiceBean.class); private static final Logger logger = LoggerFactory.getLogger(ServiceBean.class);
private static Map<String, Object> serviceMap = new HashMap<>(); private static Map<String, Class> serviceMap = new HashMap<>();
private static AtomicBoolean initialized = new AtomicBoolean(false); private static AtomicBoolean initialized = new AtomicBoolean(false);
private ServiceBean() {
throw new IllegalStateException("Utility class");
}
private static synchronized void init() { private static synchronized void init() {
// todo config // todo config
Reflections f = new Reflections("org/apache/dolphinscheduler/rpc"); Reflections f = new Reflections("org/apache/dolphinscheduler/rpc");
@ -47,16 +51,17 @@ public class ServiceBean {
list.forEach(rpcClass -> { list.forEach(rpcClass -> {
RpcService rpcService = rpcClass.getAnnotation(RpcService.class); RpcService rpcService = rpcClass.getAnnotation(RpcService.class);
serviceMap.put(rpcService.value(), rpcClass); serviceMap.put(rpcService.value(), rpcClass);
logger.info("load rpc service {}", rpcService.value());
}); });
} }
public static Class getServiceClass(String className) { public static Class getServiceClass(String className) {
if (initialized.get()) { if (initialized.get()) {
return (Class) serviceMap.get(className); return serviceMap.get(className);
} else { } else {
init(); init();
} }
return (Class) serviceMap.get(className); return serviceMap.get(className);
} }
} }

5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java

@ -199,7 +199,7 @@ public class NettyClient {
String serviceName = request.getClassName() + request.getMethodName(); String serviceName = request.getClassName() + request.getMethodName();
rpcRequestCache.setServiceName(serviceName); rpcRequestCache.setServiceName(serviceName);
RpcFuture future = null; RpcFuture future = null;
if (!async) { if (Boolean.FALSE.equals(async)) {
future = new RpcFuture(request); future = new RpcFuture(request);
rpcRequestCache.setRpcFuture(future); rpcRequestCache.setRpcFuture(future);
} }
@ -207,13 +207,14 @@ public class NettyClient {
channel.writeAndFlush(request); channel.writeAndFlush(request);
RpcResponse result = null; RpcResponse result = null;
if (async) { if (Boolean.TRUE.equals(async)) {
result = new RpcResponse(); result = new RpcResponse();
result.setStatus((byte) 0); result.setStatus((byte) 0);
result.setResult(true); result.setResult(true);
return result; return result;
} }
try { try {
assert future != null;
result = future.get(); result = future.get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
logger.error("send msg error,service name is {}", serviceName, e); logger.error("send msg error,service name is {}", serviceName, e);

27
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java

@ -21,13 +21,13 @@ import org.apache.dolphinscheduler.rpc.client.ConsumerConfig;
import org.apache.dolphinscheduler.rpc.client.ConsumerConfigCache; import org.apache.dolphinscheduler.rpc.client.ConsumerConfigCache;
import org.apache.dolphinscheduler.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.rpc.client.RpcRequestTable; import org.apache.dolphinscheduler.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.rpc.common.RequestEventType;
import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager;
import org.apache.dolphinscheduler.rpc.future.RpcFuture; import org.apache.dolphinscheduler.rpc.future.RpcFuture;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -36,7 +36,6 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.FastThreadLocalThread;
/** /**
* NettyClientHandler * NettyClientHandler
@ -47,11 +46,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
private final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE; private static final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE;
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
ctx.channel().close(); ctx.channel().close();
} }
@ -70,7 +68,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest) { private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest) {
String serviceName = rpcRequest.getServiceName(); String serviceName = rpcRequest.getServiceName();
ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
if (!consumerConfig.getAsync()) { if (Boolean.FALSE.equals(consumerConfig.getAsync())) {
RpcFuture future = rpcRequest.getRpcFuture(); RpcFuture future = rpcRequest.getRpcFuture();
RpcRequestTable.remove(rsp.getRequestId()); RpcRequestTable.remove(rsp.getRequestId());
future.done(rsp); future.done(rsp);
@ -78,31 +76,26 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
} }
//async //async
new FastThreadLocalThread(() -> {
try {
if (rsp.getStatus() == 0) { if (rsp.getStatus() == 0) {
try { try {
consumerConfig.getServiceCallBackClass().getDeclaredConstructor().newInstance().run(rsp.getResult()); consumerConfig.getServiceCallBackClass().getDeclaredConstructor().newInstance().run(rsp.getResult());
} catch (InvocationTargetException | NoSuchMethodException e) { } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
logger.error("rpc call back error, serviceName {} ", serviceName, e); logger.error("rpc service call back error,serviceName {},rsp {}", serviceName, rsp);
} }
} else { } else {
logger.error("rpc response error ,serviceName {}", serviceName); logger.error("rpc response error ,serviceName {},rsp {}", serviceName, rsp);
}
} catch (InstantiationException | IllegalAccessException e) {
logger.error("execute async error,serviceName {}", serviceName, e);
} }
}).start();
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
RpcRequest request = new RpcRequest(); RpcRequest request = new RpcRequest();
request.setEventType((byte) 0); request.setEventType(RequestEventType.HEARTBEAT.getType());
ctx.channel().writeAndFlush(request); ctx.channel().writeAndFlush(request);
logger.debug("send heart beat msg..."); logger.debug("send heart beat msg...");

11
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.rpc.remote; package org.apache.dolphinscheduler.rpc.remote;
import org.apache.dolphinscheduler.rpc.common.RequestEventType;
import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager;
@ -38,12 +39,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE; private static final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE;
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) {
@ -61,7 +57,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
RpcRequest req = (RpcRequest) msg; RpcRequest req = (RpcRequest) msg;
if (req.getEventType() == 0) { if (req.getEventType().equals(RequestEventType.HEARTBEAT.getType())) {
logger.info("accept heartbeat msg"); logger.info("accept heartbeat msg");
return; return;
@ -111,7 +107,6 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("exceptionCaught");
logger.error("exceptionCaught : {}", cause.getMessage(), cause); logger.error("exceptionCaught : {}", cause.getMessage(), cause);
ctx.channel().close(); ctx.channel().close();
} }

Loading…
Cancel
Save