Browse Source

rpc

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
4ae52cb9b0
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
  2. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java
  3. 14
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java
  4. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java
  5. 13
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java
  6. 8
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java
  7. 40
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java
  8. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java
  9. 22
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java
  10. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java
  11. 8
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java
  12. 67
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java
  13. 33
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java
  14. 29
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java
  15. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java

@ -56,7 +56,7 @@ public class NettyServerConfig {
/**
* listen port
*/
private int listenPort = 12336;
private int listenPort = 12636;
public int getListenPort() {
return listenPort;

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java

@ -9,5 +9,7 @@ import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
public interface IUserService {
@Rpc(async = true,callback = UserCallback.class)
String say(String sb);
Boolean say(String sb);
String hi(int num);
}

14
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java

@ -18,13 +18,21 @@ public class MainTest {
// NettyClient nettyClient=new NettyClient(new NettyClientConfig());
Host host = new Host("127.0.0.1", 12366);
Host host = new Host("127.0.0.1", 12636);
IRpcClient rpcClient = new RpcClient();
UserService userService = rpcClient.create(UserService.class);
String result = userService.say("calvin");
IUserService userService = rpcClient.create(UserService.class,host);
boolean result = userService.say("calvin");
System.out.println( "异步回掉成功"+result);
System.out.println(userService.hi(10));
System.out.println(userService.hi(188888888));
UserService user = rpcClient.create(UserService.class,host);
System.out.println(user.hi(99999));
System.out.println(user.hi(998888888));
// UserCallback.class.newInstance().run("lllll");
// nettyClient.sendMsg(host,rpcRequest);
}

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java

@ -9,7 +9,7 @@ import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack;
public class UserCallback extends AbstractRpcCallBack {
@Override
public void run(Object object) {
String msg= (String) object;
System.out.println("我是异步回调"+msg);
Boolean msg= (Boolean) object;
System.out.println("我是异步回调handle Kris"+msg);
}
}

13
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java

@ -6,10 +6,15 @@ import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
* @author jiangli
* @date 2021-01-11 21:05
*/
public class UserService implements IUserService{
public class UserService implements IUserService {
@Override
@Rpc(async = true,callback = UserCallback.class,retries = 9999,isOneway = false)
public String say(String s) {
return "krris"+s;
@Rpc(async = true, callback = UserCallback.class, retries = 9999, isOneway = false)
public Boolean say(String s) {
return true;
}
@Override
public String hi(int num) {
return "this world has " + num + "sun";
}
}

8
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java

@ -7,13 +7,13 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class ConsumerConfigCache {
private static ConcurrentHashMap<String, ConsumerConfig> consumerMap=new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, ConsumerConfig> consumerMap = new ConcurrentHashMap<>();
public static ConsumerConfig getConfigByServersName(String serviceName){
public static ConsumerConfig getConfigByServersName(String serviceName) {
return consumerMap.get(serviceName);
}
public static void putConfig(String serviceName,ConsumerConfig consumerConfig){
consumerMap.putIfAbsent(serviceName,consumerConfig);
public static void putConfig(String serviceName, ConsumerConfig consumerConfig) {
consumerMap.putIfAbsent(serviceName, consumerConfig);
}
}

40
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java

@ -1,20 +1,20 @@
package org.apache.dolphinscheduler.remote.rpc.client;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.function.Consumer;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
/**
* ConsumerInterceptor
@ -26,30 +26,41 @@ public class ConsumerInterceptor {
private FilterChain filterChain;
private NettyClient nettyClient=new NettyClient(new NettyClientConfig());
private Host host;
private NettyClient nettyClient = NettyClient.getInstance();
public ConsumerInterceptor(Invoker invoker) {
public ConsumerInterceptor(Invoker invoker, Host host) {
this.filterChain = new FilterChain(invoker);
this.invoker = this.filterChain.buildFilterChain();
this.host = host;
}
@RuntimeType
public Object intercept(@AllArguments Object[] args, @Origin Method method) throws Throwable {
public Object intercept(@AllArguments Object[] args, @Origin Method method) throws RemotingException {
RpcRequest request = buildReq(args, method);
String serviceName = method.getDeclaringClass().getName() + method;
String serviceName = method.getDeclaringClass().getName() + method.getName();
ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
if (null == consumerConfig) {
consumerConfig = cacheServiceConfig(method, serviceName);
}
boolean async = consumerConfig.getAsync();
//load balance
Host host = new Host("127.0.0.1", 12336);
int retries = consumerConfig.getRetries();
while (retries-- > 0) {
RpcResponse rsp = (RpcResponse) nettyClient.sendMsg(host, request, async);
//success
if (null != rsp && rsp.getStatus() == 0) {
return rsp.getResult();
}
}
// execute fail
throw new RemotingException("send msg error");
return nettyClient.sendMsg(host, request, async);
}
private RpcRequest buildReq(Object[] args, Method method) {
@ -61,7 +72,7 @@ public class ConsumerInterceptor {
request.setParameters(args);
String serviceName = method.getDeclaringClass().getName() + method;
String serviceName = method.getDeclaringClass().getName();
return request;
}
@ -77,6 +88,7 @@ public class ConsumerInterceptor {
consumerConfig.setRetries(rpc.retries());
consumerConfig.setOneway(rpc.isOneway());
}
ConsumerConfigCache.putConfig(serviceName, consumerConfig);
return consumerConfig;

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java

@ -1,11 +1,13 @@
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
* IRpcClient
*/
public interface IRpcClient {
<T> T create(Class<T> clazz) throws Exception;
<T> T create(Class<T> clazz, Host host) throws Exception;
}

22
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java

@ -1,15 +1,14 @@
package org.apache.dolphinscheduler.remote.rpc.client;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.implementation.MethodDelegation;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.concurrent.ConcurrentHashMap;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.implementation.MethodDelegation;
/**
* RpcClient
*/
@ -18,18 +17,19 @@ public class RpcClient implements IRpcClient{
private ConcurrentHashMap<String,Object> classMap=new ConcurrentHashMap<>();
@Override
public <T> T create(Class<T> clazz) throws Exception {
if(!classMap.containsKey(clazz.getName())){
public <T> T create(Class<T> clazz,Host host) throws Exception {
// if(!classMap.containsKey(clazz.getName())){
T proxy = new ByteBuddy()
.subclass(clazz)
.method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(new ConsumerInvoker())))
.method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(new ConsumerInvoker(),host)))
.make()
.load(getClass().getClassLoader())
.getLoaded()
.getDeclaredConstructor().newInstance();
classMap.putIfAbsent(clazz.getName(),proxy);
}
return (T) classMap.get(clazz.getName());
// classMap.putIfAbsent(clazz.getName(),proxy);
return proxy;
// }
// return (T) classMap.get(clazz.getName());
}
}

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java

@ -1,7 +1,5 @@
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture;
import java.util.concurrent.ConcurrentHashMap;
/**

8
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java

@ -33,15 +33,15 @@ public class RpcFuture implements Future<Object> {
}
@Override
public Object get() throws InterruptedException, ExecutionException {
public RpcResponse get() throws InterruptedException, ExecutionException {
boolean b = latch.await(5,TimeUnit.SECONDS);
return response.getResult();
return response;
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
boolean b = latch.await(timeout,unit);
return response.getResult();
return response;
}
public void done(RpcResponse response){

67
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java

@ -1,31 +1,13 @@
package org.apache.dolphinscheduler.remote.rpc.remote;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.apache.dolphinscheduler.remote.decoder.NettyDecoder;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.decoder.NettyDecoder;
import org.apache.dolphinscheduler.remote.decoder.NettyEncoder;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture;
import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
@ -41,11 +23,33 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
/**
* NettyClient
*/
public class NettyClient {
public static NettyClient getInstance() {
return NettyClient.NettyClientInner.INSTANCE;
}
private static class NettyClientInner {
private static final NettyClient INSTANCE = new NettyClient(new NettyClientConfig());
}
private final Logger logger = LoggerFactory.getLogger(NettyClient.class);
/**
@ -160,38 +164,43 @@ public class NettyClient {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new NettyEncoder(RpcRequest.class)) //OUT - 1
.addLast(new NettyEncoder(RpcRequest.class))
.addLast(new NettyDecoder(RpcResponse.class))
.addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
.addLast(new NettyClientHandler());
}
});
isStarted.compareAndSet(false, true);
System.out.println("netty client start");
}
public Object sendMsg(Host host, RpcRequest request, Boolean async) {
public RpcResponse sendMsg(Host host, RpcRequest request, Boolean async) {
System.out.println("这个不是异步"+async);
Channel channel = getChannel(host);
assert channel != null;
RpcRequestCache rpcRequestCache = new RpcRequestCache();
rpcRequestCache.setServiceName(request.getClassName() + request.getMethodName());
RpcFuture future = new RpcFuture();
rpcRequestCache.setRpcFuture(future);
RpcFuture future = null;
if (!async) {
future = new RpcFuture();
rpcRequestCache.setRpcFuture(future);
}
RpcRequestTable.put(request.getRequestId(), rpcRequestCache);
channel.writeAndFlush(request);
Object result = null;
RpcResponse result = null;
if (async) {
return true;
result=new RpcResponse();
result.setStatus((byte)0);
result.setResult(true);
return result;
}
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
logger.error("send msg error",e);
}
return result;
}

33
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java

@ -3,9 +3,11 @@ package org.apache.dolphinscheduler.remote.rpc.remote;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.dolphinscheduler.remote.rpc.client.ConsumerConfig;
import org.apache.dolphinscheduler.remote.rpc.client.ConsumerConfigCache;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
@ -36,11 +38,36 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
RpcResponse rsp = (RpcResponse) msg;
RpcRequestCache rpcRequest = RpcRequestTable.get(rsp.getRequestId());
if (null != rpcRequest) {
if (null == rpcRequest) {
logger.warn("未知响应");
return;
}
String serviceName = rpcRequest.getServiceName();
ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
if (!consumerConfig.getAsync()) {
RpcFuture future = rpcRequest.getRpcFuture();
RpcRequestTable.remove(rsp.getRequestId());
future.done(rsp);
return;
}
//async
new FastThreadLocalThread(() -> {
try {
if (rsp.getStatus() == 0) {
consumerConfig.getCallBackClass().newInstance().run(rsp.getResult());
} else {
logger.error("xxxx fail");
}
} catch (InstantiationException | IllegalAccessException e) {
logger.error("execute async error", e);
}
}).start();
}
@Override
@ -51,7 +78,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
RpcRequest request = new RpcRequest();
request.setMethodName("heart");
ctx.channel().writeAndFlush(request);
logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息...");
logger.debug("send heart beat msg...");
} else {
super.userEventTriggered(ctx, evt);

29
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java

@ -1,18 +1,17 @@
package org.apache.dolphinscheduler.remote.rpc.remote;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
/**
* NettyServerHandler
*/
@ -39,17 +38,19 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException, InstantiationException {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
RpcRequest req = (RpcRequest) msg;
RpcResponse response = new RpcResponse();
if(req.getMethodName().equals("heart")){
if (req.getMethodName().equals("heart")) {
logger.info("接受心跳消息!...");
return;
}
response.setRequestId(req.getRequestId());
response.setStatus((byte) 0);
String classname = req.getClassName();
@ -58,14 +59,20 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
Class<?>[] parameterTypes = req.getParameterTypes();
Object[] arguments = req.getParameters();
Object result = null;
try {
Class serviceClass = Class.forName(classname);
Class serviceClass = Class.forName(classname);
Object object = serviceClass.newInstance();
Object object = serviceClass.newInstance();
Method method = serviceClass.getMethod(methodName, parameterTypes);
Method method = serviceClass.getMethod(methodName, parameterTypes);
Object result = method.invoke(object, arguments);
result = method.invoke(object, arguments);
} catch (Exception e) {
logger.error("netty server execute error",e);
response.setStatus((byte)-1);
}
response.setResult(result);
ctx.writeAndFlush(response);

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java

@ -8,8 +8,7 @@ import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author jiangli
* @date 2021-01-11 12:00
* RandomSelector
*/
public class RandomSelector extends AbstractSelector<Host> {

Loading…
Cancel
Save