diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java index e8b31e9b7c..160e5f50ff 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java @@ -9,21 +9,19 @@ import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; import java.util.List; /** - * @author jiangli - * @date 2021-01-12 18:53 + * NettyDecoder */ public class NettyDecoder extends ByteToMessageDecoder { private Class genericClass; - // 构造函数传入向反序列化的class + public NettyDecoder(Class genericClass) { this.genericClass = genericClass; } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { - System.out.println("decoder"); if (byteBuf.readableBytes() < 4) { return; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java index 7a4dc6dc27..c381b3fdc4 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java @@ -11,24 +11,22 @@ import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; * @author jiangli * @date 2021-01-12 18:52 */ -public class NettyEncoder extends MessageToByteEncoder { +public class NettyEncoder extends MessageToByteEncoder { private Class genericClass; - // 构造函数传入向反序列化的class public NettyEncoder(Class genericClass) { this.genericClass = genericClass; } + @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { - System.out.println("encsss"); if (genericClass.isInstance(o)) { byte[] data = ProtoStuffUtils.serialize(o); byteBuf.writeInt(data.length); byteBuf.writeBytes(data); } - } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java index 23ae0c2586..87bf405920 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java @@ -1,10 +1,13 @@ package org.apache.dolphinscheduler.remote.rpc; +import org.apache.dolphinscheduler.remote.rpc.base.Rpc; + /** * @author jiangli * @date 2021-01-11 21:05 */ public interface IUserService { + @Rpc(async = true,callback = UserCallback.class) String say(String sb); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java index 1718152f3a..df5f8d0257 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java @@ -1,12 +1,9 @@ package org.apache.dolphinscheduler.remote.rpc; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient; import org.apache.dolphinscheduler.remote.rpc.client.RpcClient; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer; import org.apache.dolphinscheduler.remote.utils.Host; @@ -26,8 +23,7 @@ public class MainTest { IRpcClient rpcClient = new RpcClient(); UserService userService = rpcClient.create(UserService.class); String result = userService.say("calvin"); - System.out.println("我是你爸爸吧"+result); - + System.out.println( "异步回掉成功"+result); // nettyClient.sendMsg(host,rpcRequest); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java new file mode 100644 index 0000000000..4bd60996cb --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java @@ -0,0 +1,15 @@ +package org.apache.dolphinscheduler.remote.rpc; + +import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; + +/** + * @author jiangli + * @date 2021-01-15 07:32 + */ +public class UserCallback extends AbstractRpcCallBack { + @Override + public void run(Object object) { + String msg= (String) object; + System.out.println("我是异步回调"+msg); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java index 0dfc08ad2c..8485d10f56 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java @@ -1,11 +1,14 @@ package org.apache.dolphinscheduler.remote.rpc; +import org.apache.dolphinscheduler.remote.rpc.base.Rpc; + /** * @author jiangli * @date 2021-01-11 21:05 */ public class UserService implements IUserService{ @Override + @Rpc(async = true,callback = UserCallback.class,retries = 9999,isOneway = false) public String say(String s) { return "krris"+s; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java new file mode 100644 index 0000000000..5ad64f52f4 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java @@ -0,0 +1,25 @@ +package org.apache.dolphinscheduler.remote.rpc.base; + +import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Rpc + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface Rpc { + + int retries() default 3; + + boolean async() default false; + + boolean isOneway() default true; + + Class callback() default AbstractRpcCallBack.class; + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java new file mode 100644 index 0000000000..bb046f435e --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java @@ -0,0 +1,61 @@ +package org.apache.dolphinscheduler.remote.rpc.client; + +import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; +import org.apache.dolphinscheduler.remote.rpc.common.ConsumerConfigConstants; + +/** + * ConsumerConfig + */ +public class ConsumerConfig { + + private Class callBackClass; + + private String serviceName; + + private Boolean async = ConsumerConfigConstants.DEFAULT_SYNC; + + private Boolean isOneway = ConsumerConfigConstants.DEFAULT_IS_ONEWAY; + + private Integer retries = ConsumerConfigConstants.DEFAULT_RETRIES; + + + public Class getCallBackClass() { + return callBackClass; + } + + public void setCallBackClass(Class callBackClass) { + this.callBackClass = callBackClass; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public Boolean getAsync() { + return async; + } + + public void setAsync(Boolean async) { + this.async = async; + } + + public Boolean getOneway() { + return isOneway; + } + + public void setOneway(Boolean oneway) { + isOneway = oneway; + } + + public Integer getRetries() { + return retries; + } + + public void setRetries(Integer retries) { + this.retries = retries; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java new file mode 100644 index 0000000000..898a1c0827 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java @@ -0,0 +1,19 @@ +package org.apache.dolphinscheduler.remote.rpc.client; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * ConsumerConfigCache + */ +public class ConsumerConfigCache { + + private static ConcurrentHashMap consumerMap=new ConcurrentHashMap<>(); + + public static ConsumerConfig getConfigByServersName(String serviceName){ + return consumerMap.get(serviceName); + } + + public static void putConfig(String serviceName,ConsumerConfig consumerConfig){ + consumerMap.putIfAbsent(serviceName,consumerConfig); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java index 3058cf0285..ccc0fb358d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java @@ -6,6 +6,7 @@ import net.bytebuddy.implementation.bind.annotation.RuntimeType; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.rpc.Invoker; +import org.apache.dolphinscheduler.remote.rpc.base.Rpc; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain; import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; @@ -13,6 +14,7 @@ import org.apache.dolphinscheduler.remote.utils.Host; import java.lang.reflect.Method; import java.util.UUID; +import java.util.function.Consumer; /** * ConsumerInterceptor @@ -24,6 +26,8 @@ public class ConsumerInterceptor { private FilterChain filterChain; + private NettyClient nettyClient=new NettyClient(new NettyClientConfig()); + public ConsumerInterceptor(Invoker invoker) { this.filterChain = new FilterChain(invoker); this.invoker = this.filterChain.buildFilterChain(); @@ -33,13 +37,19 @@ public class ConsumerInterceptor { @RuntimeType public Object intercept(@AllArguments Object[] args, @Origin Method method) throws Throwable { RpcRequest request = buildReq(args, method); - //todo - System.out.println(invoker.invoke(request)); - NettyClient nettyClient = new NettyClient(new NettyClientConfig()); - Host host = new Host("127.0.0.1", 12336); - return nettyClient.sendMsg(host, request); + String serviceName = method.getDeclaringClass().getName() + method; + ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); + if (null == consumerConfig) { + consumerConfig = cacheServiceConfig(method, serviceName); + } + boolean async = consumerConfig.getAsync(); + + //load balance + Host host = new Host("127.0.0.1", 12336); + + return nettyClient.sendMsg(host, request, async); } private RpcRequest buildReq(Object[] args, Method method) { @@ -48,8 +58,28 @@ public class ConsumerInterceptor { request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); + request.setParameters(args); + + String serviceName = method.getDeclaringClass().getName() + method; + return request; } + private ConsumerConfig cacheServiceConfig(Method method, String serviceName) { + ConsumerConfig consumerConfig = new ConsumerConfig(); + consumerConfig.setServiceName(serviceName); + boolean annotationPresent = method.isAnnotationPresent(Rpc.class); + if (annotationPresent) { + Rpc rpc = method.getAnnotation(Rpc.class); + consumerConfig.setAsync(rpc.async()); + consumerConfig.setCallBackClass(rpc.callback()); + consumerConfig.setRetries(rpc.retries()); + consumerConfig.setOneway(rpc.isOneway()); + } + ConsumerConfigCache.putConfig(serviceName, consumerConfig); + + return consumerConfig; + } + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java index 5ef800ec5d..cc0e86a2a9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java @@ -5,8 +5,7 @@ import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; /** - * @author jiangli - * @date 2021-01-09 15:27 + * ConsumerInvoker */ public class ConsumerInvoker implements Invoker { @Override diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java index 0b384cbe62..e0e538c044 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java @@ -1,8 +1,7 @@ package org.apache.dolphinscheduler.remote.rpc.client; /** - * @author jiangli - * @date 2021-01-09 10:58 + * IRpcClient */ public interface IRpcClient { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java index b9efdf8f9f..5fa8f45f51 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java @@ -5,21 +5,18 @@ import net.bytebuddy.implementation.MethodDelegation; import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.rpc.base.Rpc; import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; import java.util.concurrent.ConcurrentHashMap; /** - * @author jiangli - * @date 2021-01-09 10:59 + * RpcClient */ public class RpcClient implements IRpcClient{ private ConcurrentHashMap classMap=new ConcurrentHashMap<>(); - - - @Override public T create(Class clazz) throws Exception { if(!classMap.containsKey(clazz.getName())){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java new file mode 100644 index 0000000000..5e75322ac7 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java @@ -0,0 +1,29 @@ +package org.apache.dolphinscheduler.remote.rpc.client; + +import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; + +/** + * RpcRequestCache + */ +public class RpcRequestCache { + + private RpcFuture rpcFuture; + + private String serviceName; + + public RpcFuture getRpcFuture() { + return rpcFuture; + } + + public void setRpcFuture(RpcFuture rpcFuture) { + this.rpcFuture = rpcFuture; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java index 677a2997e0..845f832526 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java @@ -5,24 +5,23 @@ import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; import java.util.concurrent.ConcurrentHashMap; /** - * @author jiangli - * @date 2021-01-14 10:42 + * RpcRequestTable */ public class RpcRequestTable { - // key: requestId value: RpcFuture - private static ConcurrentHashMap processingRpc = new ConcurrentHashMap<>(); - public static void put(String requestId,RpcFuture rpcFuture){ - processingRpc.put(requestId,rpcFuture); + private static ConcurrentHashMap requestMap = new ConcurrentHashMap<>(); + + public static void put(String requestId,RpcRequestCache rpcRequestCache){ + requestMap.put(requestId,rpcRequestCache); } - public static RpcFuture get(String requestId){ - return processingRpc.get(requestId); + public static RpcRequestCache get(String requestId){ + return requestMap.get(requestId); } public static void remove(String requestId){ - processingRpc.remove(requestId); + requestMap.remove(requestId); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java new file mode 100644 index 0000000000..3e632c87f6 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java @@ -0,0 +1,10 @@ +package org.apache.dolphinscheduler.remote.rpc.common; + +/** + * AbstractRpcCallBack + */ +public abstract class AbstractRpcCallBack { + + public abstract void run(Object object); + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java new file mode 100644 index 0000000000..c844cd7b7c --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java @@ -0,0 +1,14 @@ +package org.apache.dolphinscheduler.remote.rpc.common; + +/** + * ConsumerConfigConstants + */ +public class ConsumerConfigConstants { + + + public static final Boolean DEFAULT_SYNC = false; + + public static final Boolean DEFAULT_IS_ONEWAY = false; + + public static final Integer DEFAULT_RETRIES = 3; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java index 1e0eb2caf2..833ad0c73b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java @@ -1,8 +1,7 @@ package org.apache.dolphinscheduler.remote.rpc.common; /** - * @author jiangli - * @date 2021-01-09 13:21 + * RpcRequest */ public class RpcRequest { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java index 516e05b917..9a72801365 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java @@ -9,8 +9,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** - * @author jiangli - * @date 2021-01-14 09:24 + * RpcFuture */ public class RpcFuture implements Future { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java index 9fb923d893..c8a2317878 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; import org.apache.dolphinscheduler.remote.future.ResponseFuture; +import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; @@ -41,8 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author jiangli - * @date 2021-01-13 19:31 + * NettyClient */ public class NettyClient { @@ -172,24 +172,30 @@ public class NettyClient { System.out.println("netty client start"); } - public Object sendMsg(Host host, RpcRequest request) { + public Object sendMsg(Host host, RpcRequest request, Boolean async) { + + System.out.println("这个不是异步"+async); Channel channel = getChannel(host); assert channel != null; + RpcRequestCache rpcRequestCache = new RpcRequestCache(); + rpcRequestCache.setServiceName(request.getClassName() + request.getMethodName()); RpcFuture future = new RpcFuture(); - RpcRequestTable.put(request.getRequestId(), future); + rpcRequestCache.setRpcFuture(future); + RpcRequestTable.put(request.getRequestId(), rpcRequestCache); channel.writeAndFlush(request); + Object result = null; + if (async) { + return true; + } try { - result=future.get(); + result = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return result; - - } - /** * close */ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java index 2c6202bca7..5e965e6063 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java @@ -1,16 +1,12 @@ package org.apache.dolphinscheduler.remote.rpc.remote; - -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; @@ -22,8 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author jiangli - * @date 2021-01-13 13:33 + * NettyClientHandler */ @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @@ -31,45 +26,35 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); - @Override - public void channelActive(ChannelHandlerContext ctx) { - - // ctx.channel().close(); - } - @Override public void channelInactive(ChannelHandlerContext ctx) { - System.out.println("client 关闭channel"); InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); ctx.channel().close(); - //todo connectManage.removeChannel(ctx.channel()); } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println("收到消息"); + public void channelRead(ChannelHandlerContext ctx, Object msg) { RpcResponse rsp = (RpcResponse) msg; - RpcFuture rpcFuture= RpcRequestTable.get(rsp.getRequestId()); - if(null!=rpcFuture){ + RpcRequestCache rpcRequest = RpcRequestTable.get(rsp.getRequestId()); + if (null != rpcRequest) { + RpcFuture future = rpcRequest.getRpcFuture(); RpcRequestTable.remove(rsp.getRequestId()); - rpcFuture.done(rsp); + future.done(rsp); } - System.out.println(rsp.getResult().toString()); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent){ - IdleStateEvent event = (IdleStateEvent)evt; - if (event.state()== IdleState.ALL_IDLE){ - RpcRequest request = new RpcRequest(); - request.setMethodName("heartBeat"); - ctx.channel().writeAndFlush(request); - logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息..."); - } - }else{ - super.userEventTriggered(ctx,evt); + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + RpcRequest request = new RpcRequest(); + request.setMethodName("heart"); + ctx.channel().writeAndFlush(request); + logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息..."); + + } else { + super.userEventTriggered(ctx, evt); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java index 41495f11c8..ed88345945 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java @@ -31,8 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author jiangli - * @date 2021-01-13 19:32 + * NettyServer */ public class NettyServer { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java index b7feb22585..47bc3e70aa 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java @@ -14,12 +14,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author jiangli - * @date 2021-01-13 19:20 + * NettyServerHandler */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); + @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); @@ -27,47 +27,45 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override - public void channelInactive(ChannelHandlerContext ctx){ + public void channelInactive(ChannelHandlerContext ctx) { logger.info("channel close"); - ctx.channel().close(); + ctx.channel().close(); } @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - System.out.println("客户端连接成功!"+ctx.channel().remoteAddress()); - logger.info("客户端连接成功!"+ctx.channel().remoteAddress()); + public void channelActive(ChannelHandlerContext ctx) { + logger.info("client connect success !" + ctx.channel().remoteAddress()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException, InstantiationException { - logger.info("server read msg"); - System.out.println("收到消息"); - RpcRequest req= (RpcRequest) msg; - System.out.println(req.getRequestId()); - RpcResponse response=new RpcResponse(); - response.setMsg("llll"); + + RpcRequest req = (RpcRequest) msg; + + RpcResponse response = new RpcResponse(); + if(req.getMethodName().equals("heart")){ + logger.info("接受心跳消息!..."); + return; + } response.setRequestId(req.getRequestId()); - String classname=req.getClassName(); - //获得服务端要调用的方法名称 - String methodName=req.getMethodName(); - //获得服务端要调用方法的参数类型 - Class[] parameterTypes=req.getParameterTypes(); - //获得服务端要调用方法的每一个参数的值 - Object[] arguments=req.getParameters(); + String classname = req.getClassName(); + + String methodName = req.getMethodName(); + + Class[] parameterTypes = req.getParameterTypes(); + + Object[] arguments = req.getParameters(); + + Class serviceClass = Class.forName(classname); - //创建类 - Class serviceClass=Class.forName(classname); - //创建对象 Object object = serviceClass.newInstance(); - //获得该类的对应的方法 - Method method=serviceClass.getMethod(methodName, parameterTypes); - //该对象调用指定方法 - Object result=method.invoke(object, arguments); + Method method = serviceClass.getMethod(methodName, parameterTypes); + Object result = method.invoke(object, arguments); response.setResult(result); ctx.writeAndFlush(response); @@ -83,7 +81,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.out.println("exceptionCaught"); logger.error("exceptionCaught : {}", cause.getMessage(), cause); ctx.channel().close(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java index 4face03f90..0277273ef5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java @@ -9,8 +9,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** - * @author jiangli - * @date 2021-01-12 18:56 + * ProtoStuffUtils */ public class ProtoStuffUtils {