diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java index 7bac0361f4..4ec8a0f7a7 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java @@ -56,7 +56,7 @@ public class NettyServerConfig { /** * listen port */ - private int listenPort = 12636; + private int listenPort = 12346; public int getListenPort() { return listenPort; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java index c381b3fdc4..b732a50680 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java @@ -4,7 +4,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; /** diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/Invoker.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/Invoker.java deleted file mode 100644 index e93dfa4e66..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/Invoker.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc; - -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; - -/** - * Invoker - */ -public interface Invoker { - - RpcResponse invoke(RpcRequest req) throws Throwable; -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainServerTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainServerTest.java deleted file mode 100644 index 88e0f615f1..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainServerTest.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc; - -import org.apache.dolphinscheduler.remote.config.NettyServerConfig; -import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer; - -/** - * @author jiangli - * @date 2021-01-20 14:54 - */ -public class MainServerTest { - - public static void main(String[] args) { - NettyServer nettyServer = new NettyServer(new NettyServerConfig()); - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java deleted file mode 100644 index 4115184f22..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc; - -import org.apache.dolphinscheduler.remote.config.NettyServerConfig; -import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient; -import org.apache.dolphinscheduler.remote.rpc.client.RpcClient; -import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer; -import org.apache.dolphinscheduler.remote.utils.Host; - -/** - * @author jiangli - * @date 2021-01-11 21:06 - */ -public class MainTest { - - public static void main(String[] args) throws Exception { - - - // NettyServer nettyServer = new NettyServer(new NettyServerConfig()); - - // NettyClient nettyClient=new NettyClient(new NettyClientConfig()); - - Host host = new Host("127.0.0.1", 12636); - - IRpcClient rpcClient = new RpcClient(); - IUserService userService = rpcClient.create(IUserService.class, host); - boolean result = userService.say("calvin"); - System.out.println("异步回掉成功" + result); - - System.out.println(userService.hi(10)); - System.out.println(userService.hi(188888888)); - - IUserService user = rpcClient.create(IUserService.class, host); - System.out.println(user.hi(99999)); - System.out.println(user.hi(998888888)); - - System.out.println(IUserService.class.getSimpleName()); - System.out.println(UserService.class.getSimpleName()); - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java deleted file mode 100644 index fe712dfbe3..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc; - -import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; - -/** - * @author jiangli - * @date 2021-01-15 07:32 - */ -public class UserCallback extends AbstractRpcCallBack { - @Override - public void run(Object object) { - Boolean msg= (Boolean) object; - System.out.println("我是异步回调handle Kris"+msg); - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java deleted file mode 100644 index 65734074fe..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc; - -import org.apache.dolphinscheduler.remote.rpc.base.Rpc; -import org.apache.dolphinscheduler.remote.rpc.base.RpcService; - -/** - * @author jiangli - * @date 2021-01-11 21:05 - */ -@RpcService("IUserService") -public class UserService implements IUserService{ - - @Rpc(async = true, serviceCallback = UserCallback.class, retries = 9999) - @Override - public Boolean say(String s) { - return true; - } - - @Override - public String hi(int num) { - return "this world has " + num + "sun"; - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/Rpc.java similarity index 92% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/Rpc.java index 7ccee632b9..1fc6ca3627 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/Rpc.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.base; +package org.apache.dolphinscheduler.rpc.base; -import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; +import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/RpcService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/RpcService.java similarity index 95% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/RpcService.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/RpcService.java index 4f2407413b..5ebb2cfe7a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/RpcService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/RpcService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.base; +package org.apache.dolphinscheduler.rpc.base; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfig.java similarity index 83% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfig.java index 897eb688cc..10d6cd5c4b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfig.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.client; +package org.apache.dolphinscheduler.rpc.client; -import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; -import org.apache.dolphinscheduler.remote.rpc.common.ConsumerConfigConstants; +import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack; +import org.apache.dolphinscheduler.rpc.common.ConsumerConfigConstants; /** * We will cache the consumer configuration, when the rpc call is generated, the consumer configuration will be first obtained from here @@ -39,7 +39,7 @@ public class ConsumerConfig { return serviceCallBackClass; } - public void setServiceCallBackClass(Class serviceCallBackClass) { + void setServiceCallBackClass(Class serviceCallBackClass) { this.serviceCallBackClass = serviceCallBackClass; } @@ -47,7 +47,7 @@ public class ConsumerConfig { return ackCallBackClass; } - public void setAckCallBackClass(Class ackCallBackClass) { + void setAckCallBackClass(Class ackCallBackClass) { this.ackCallBackClass = ackCallBackClass; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfigCache.java similarity index 95% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfigCache.java index f8d7d570b0..a407079fc5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfigCache.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.client; +package org.apache.dolphinscheduler.rpc.client; import java.util.concurrent.ConcurrentHashMap; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java similarity index 92% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java index 2b90c429b3..f976d0b1d6 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.client; +package org.apache.dolphinscheduler.rpc.client; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; -import org.apache.dolphinscheduler.remote.rpc.base.Rpc; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; -import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.rpc.base.Rpc; +import org.apache.dolphinscheduler.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.rpc.remote.NettyClient; import java.lang.reflect.Method; import java.util.UUID; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/IRpcClient.java similarity index 94% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/IRpcClient.java index ac7574ed1a..979a979e88 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/IRpcClient.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.client; +package org.apache.dolphinscheduler.rpc.client; import org.apache.dolphinscheduler.remote.utils.Host; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcClient.java similarity index 77% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcClient.java index b8d1202433..02f99b4f62 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcClient.java @@ -15,14 +15,12 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.client; +package org.apache.dolphinscheduler.rpc.client; import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; import org.apache.dolphinscheduler.remote.utils.Host; -import java.util.concurrent.ConcurrentHashMap; - import net.bytebuddy.ByteBuddy; import net.bytebuddy.implementation.MethodDelegation; @@ -31,22 +29,14 @@ import net.bytebuddy.implementation.MethodDelegation; */ public class RpcClient implements IRpcClient { - private ConcurrentHashMap classMap = new ConcurrentHashMap<>(); - @Override public T create(Class clazz, Host host) throws Exception { - // if(!classMap.containsKey(clazz.getName())){ - T proxy = new ByteBuddy() + return new ByteBuddy() .subclass(clazz) .method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(host))) .make() .load(getClass().getClassLoader()) .getLoaded() .getDeclaredConstructor().newInstance(); - - // classMap.putIfAbsent(clazz.getName(),proxy); - return proxy; - // } - // return (T) classMap.get(clazz.getName()); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestCache.java similarity index 91% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestCache.java index 153ceba293..14864d6c8b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestCache.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.client; +package org.apache.dolphinscheduler.rpc.client; -import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; +import org.apache.dolphinscheduler.rpc.future.RpcFuture; /** * RpcRequestCache diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java similarity index 96% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java index ae31252d9a..f3cdff4f77 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.client; +package org.apache.dolphinscheduler.rpc.client; import java.util.concurrent.ConcurrentHashMap; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/AbstractRpcCallBack.java similarity index 95% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/AbstractRpcCallBack.java index 758b8474bb..2b106a96e4 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/AbstractRpcCallBack.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.common; +package org.apache.dolphinscheduler.rpc.common; /** * AbstractRpcCallBack diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java similarity index 94% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java index d23307aedf..478c10432c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.common; +package org.apache.dolphinscheduler.rpc.common; /** * ConsumerConfigConstants diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RequestEventType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RequestEventType.java similarity index 95% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RequestEventType.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RequestEventType.java index c2f8a8f1dd..558c88067b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RequestEventType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RequestEventType.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.common; +package org.apache.dolphinscheduler.rpc.common; public enum RequestEventType { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ResponseEventType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ResponseEventType.java similarity index 95% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ResponseEventType.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ResponseEventType.java index 4c0d72181c..66ca157e23 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ResponseEventType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ResponseEventType.java @@ -1,4 +1,4 @@ -package org.apache.dolphinscheduler.remote.rpc.common;/* +package org.apache.dolphinscheduler.rpc.common;/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -29,7 +29,6 @@ public enum ResponseEventType { this.description = description; } - public Byte getType() { return type; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcRequest.java similarity index 97% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcRequest.java index 3c8732c61f..a22c8f8a93 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcRequest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.common; +package org.apache.dolphinscheduler.rpc.common; /** * RpcRequest diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcResponse.java similarity index 96% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcResponse.java index 9e6db6ec11..6aafc039aa 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcResponse.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.common; +package org.apache.dolphinscheduler.rpc.common; /** * RpcResponse diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ThreadPoolManager.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ThreadPoolManager.java new file mode 100644 index 0000000000..cb4c33a23a --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ThreadPoolManager.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.rpc.common; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy; +import java.util.concurrent.TimeUnit; + +public enum ThreadPoolManager { + + INSTANCE; + + ExecutorService executorService; + + ThreadPoolManager() { + int SIZE_WORK_QUEUE = 200; + long KEEP_ALIVE_TIME = 60; + int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; + int MAXI_MUM_POOL_SIZE = CORE_POOL_SIZE * 4; + executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXI_MUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(SIZE_WORK_QUEUE), + new DiscardPolicy()); + } + + public void addExecuteTask(Runnable task) { + executorService.submit(task); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/config/ServiceBean.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java similarity index 86% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/config/ServiceBean.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java index a0fe93c5be..f51f35b3e6 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/config/ServiceBean.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java @@ -15,16 +15,14 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.config; +package org.apache.dolphinscheduler.rpc.config; -import org.apache.dolphinscheduler.remote.rpc.IUserService; -import org.apache.dolphinscheduler.remote.rpc.base.RpcService; +import org.apache.dolphinscheduler.rpc.base.RpcService; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.ServiceLoader; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,7 +30,9 @@ import org.reflections.Reflections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * ServiceBean find all rpcService + */ public class ServiceBean { private static final Logger logger = LoggerFactory.getLogger(ServiceBean.class); @@ -42,9 +42,8 @@ public class ServiceBean { private static AtomicBoolean initialized = new AtomicBoolean(false); private static synchronized void init() { - Reflections f = new Reflections("org/apache/dolphinscheduler/remote/rpc"); - - + // todo config + Reflections f = new Reflections("org/apache/dolphinscheduler/rpc"); List> list = new ArrayList<>(f.getTypesAnnotatedWith(RpcService.class)); list.forEach(rpcClass -> { RpcService rpcService = rpcClass.getAnnotation(RpcService.class); @@ -52,10 +51,6 @@ public class ServiceBean { }); } - public static void main(String[] args) { - init(); - } - public static Class getServiceClass(String className) { if (initialized.get()) { return (Class) serviceMap.get(className); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java similarity index 94% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java index 60f25294bb..8cf1dfaa22 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.future; +package org.apache.dolphinscheduler.rpc.future; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.rpc.common.RpcResponse; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java similarity index 76% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java index 0da43ce0dc..ed7aaa05ef 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java @@ -15,16 +15,16 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.remote; +package org.apache.dolphinscheduler.rpc.remote; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; -import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache; -import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; -import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; +import org.apache.dolphinscheduler.rpc.client.RpcRequestCache; +import org.apache.dolphinscheduler.rpc.client.RpcRequestTable; +import org.apache.dolphinscheduler.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.rpc.future.RpcFuture; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NettyUtils; @@ -98,7 +98,7 @@ public class NettyClient { /** * get channel */ - public Channel getChannel(Host host) { + private Channel getChannel(Host host) { Channel channel = channels.get(host); if (channel != null && channel.isActive()) { return channel; @@ -138,7 +138,7 @@ public class NettyClient { * * @param clientConfig client config */ - public NettyClient(final NettyClientConfig clientConfig) { + private NettyClient(final NettyClientConfig clientConfig) { this.clientConfig = clientConfig; if (NettyUtils.useEpoll()) { this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { @@ -169,24 +169,24 @@ public class NettyClient { private void start() { this.bootstrap - .group(this.workerGroup) - .channel(NettyUtils.getSocketChannelClass()) - .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) - .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) - .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) - .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) - .handler(new LoggingHandler(LogLevel.DEBUG)) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) { - ch.pipeline() - .addLast(new NettyEncoder(RpcRequest.class)) - .addLast(new NettyDecoder(RpcResponse.class)) - .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) - .addLast(new NettyClientHandler()); - } - }); + .group(this.workerGroup) + .channel(NettyUtils.getSocketChannelClass()) + .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) + .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) + .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) + .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ch.pipeline() + .addLast(new NettyEncoder(RpcRequest.class)) + .addLast(new NettyDecoder(RpcResponse.class)) + .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) + .addLast(new NettyClientHandler()); + } + }); isStarted.compareAndSet(false, true); } @@ -196,9 +196,8 @@ public class NettyClient { Channel channel = getChannel(host); assert channel != null; RpcRequestCache rpcRequestCache = new RpcRequestCache(); - rpcRequestCache.setServiceName(request.getClassName() + request.getMethodName()); - - + String serviceName = request.getClassName() + request.getMethodName(); + rpcRequestCache.setServiceName(serviceName); RpcFuture future = null; if (!async) { future = new RpcFuture(); @@ -209,15 +208,15 @@ public class NettyClient { RpcResponse result = null; if (async) { - result=new RpcResponse(); - result.setStatus((byte)0); + result = new RpcResponse(); + result.setStatus((byte) 0); result.setResult(true); return result; } try { result = future.get(); } catch (InterruptedException | ExecutionException e) { - logger.error("send msg error",e); + logger.error("send msg error,service name is {}", serviceName, e); } return result; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java similarity index 74% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java index 97d4bb0444..ec4f97255a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java @@ -15,21 +15,16 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.remote; +package org.apache.dolphinscheduler.rpc.remote; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.concurrent.FastThreadLocalThread; - -import org.apache.dolphinscheduler.remote.rpc.client.ConsumerConfig; -import org.apache.dolphinscheduler.remote.rpc.client.ConsumerConfigCache; -import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache; -import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; -import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; +import org.apache.dolphinscheduler.rpc.client.ConsumerConfig; +import org.apache.dolphinscheduler.rpc.client.ConsumerConfigCache; +import org.apache.dolphinscheduler.rpc.client.RpcRequestCache; +import org.apache.dolphinscheduler.rpc.client.RpcRequestTable; +import org.apache.dolphinscheduler.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; +import org.apache.dolphinscheduler.rpc.future.RpcFuture; import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; @@ -37,6 +32,12 @@ import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.concurrent.FastThreadLocalThread; + /** * NettyClientHandler */ @@ -46,6 +47,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); + private final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE; + @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); @@ -58,10 +61,13 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { RpcRequestCache rpcRequest = RpcRequestTable.get(rsp.getRequestId()); if (null == rpcRequest) { - logger.warn("未知响应"); + logger.warn("rpc read error,this request does not exist"); return; } + threadPoolManager.addExecuteTask(() -> readHandler(rsp, rpcRequest)); + } + private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest) { String serviceName = rpcRequest.getServiceName(); ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); if (!consumerConfig.getAsync()) { @@ -71,7 +77,6 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { return; } - //async new FastThreadLocalThread(() -> { try { @@ -79,18 +84,16 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { try { consumerConfig.getServiceCallBackClass().getDeclaredConstructor().newInstance().run(rsp.getResult()); } catch (InvocationTargetException | NoSuchMethodException e) { - logger.error("rpc call back error",e); + logger.error("rpc call back error, serviceName {} ", serviceName, e); } } else { - logger.error("xxxx fail"); + logger.error("rpc response error ,serviceName {}", serviceName); } } catch (InstantiationException | IllegalAccessException e) { - logger.error("execute async error", e); + logger.error("execute async error,serviceName {}", serviceName, e); } }).start(); - - } @Override @@ -99,7 +102,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; RpcRequest request = new RpcRequest(); - request.setEventType((byte)0); + request.setEventType((byte) 0); ctx.channel().writeAndFlush(request); logger.debug("send heart beat msg..."); @@ -110,12 +113,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - System.out.println("exceptionCaught"); logger.error("exceptionCaught : {}", cause.getMessage(), cause); ctx.channel().close(); } - private void executeAsyncHandler(){ - - } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServer.java similarity index 79% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServer.java index eca355a483..cbba950c23 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServer.java @@ -15,29 +15,15 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.remote; +package org.apache.dolphinscheduler.rpc.remote; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.netty.handler.timeout.IdleStateHandler; - -import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; - +import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; - -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NettyUtils; +import org.apache.dolphinscheduler.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.rpc.common.RpcResponse; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -47,12 +33,24 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; + /** * NettyServer */ public class NettyServer { - private static final Logger logger =LoggerFactory.getLogger(NettyServer.class); + private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); /** * boss group @@ -126,29 +124,28 @@ public class NettyServer { this.start(); } - /** * server start */ public void start() { if (isStarted.compareAndSet(false, true)) { this.serverBootstrap - .group(this.bossGroup, this.workGroup) - .channel(NettyUtils.getServerSocketChannelClass()) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) - .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) - .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) - .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) - .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) - .handler(new LoggingHandler(LogLevel.DEBUG)) - .childHandler(new ChannelInitializer() { - - @Override - protected void initChannel(SocketChannel ch){ - initNettyChannel(ch); - } - }); + .group(this.bossGroup, this.workGroup) + .channel(NettyUtils.getServerSocketChannelClass()) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) + .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) + .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) + .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) + .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { + initNettyChannel(ch); + } + }); ChannelFuture future; try { @@ -165,7 +162,6 @@ public class NettyServer { throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); } } - System.out.println("netty ser ver start"); } /** @@ -175,13 +171,12 @@ public class NettyServer { */ private void initNettyChannel(SocketChannel ch) { ch.pipeline() - .addLast(new NettyDecoder(RpcRequest.class)) - .addLast(new NettyEncoder(RpcResponse.class)) - .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) - .addLast("handler", new NettyServerHandler()); + .addLast(new NettyDecoder(RpcRequest.class)) + .addLast(new NettyEncoder(RpcResponse.class)) + .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) + .addLast("handler", new NettyServerHandler()); } - public void close() { if (isStarted.compareAndSet(true, false)) { try { @@ -199,5 +194,4 @@ public class NettyServer { } } - } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java similarity index 84% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java index 9556ec1891..ae4ccaab6e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java @@ -15,18 +15,14 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.rpc.remote; +package org.apache.dolphinscheduler.rpc.remote; -import org.apache.dolphinscheduler.remote.rpc.IUserService; -import org.apache.dolphinscheduler.remote.rpc.base.RpcService; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; -import org.apache.dolphinscheduler.remote.rpc.config.ServiceBean; +import org.apache.dolphinscheduler.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; +import org.apache.dolphinscheduler.rpc.config.ServiceBean; import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; -import java.util.ServiceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,19 +38,19 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); + private final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE; + @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); } - @Override public void channelInactive(ChannelHandlerContext ctx) { logger.info("channel close"); ctx.channel().close(); } - @Override public void channelActive(ChannelHandlerContext ctx) { logger.info("client connect success !" + ctx.channel().remoteAddress()); @@ -65,19 +61,20 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { RpcRequest req = (RpcRequest) msg; - RpcResponse response = new RpcResponse(); if (req.getEventType() == 0) { logger.info("accept heartbeat msg"); return; } - //todo 使用业务线程池去处理 不要占用netty的资源 - response.setRequestId(req.getRequestId()); + threadPoolManager.addExecuteTask(() -> readHandler(ctx, req)); + } + private void readHandler(ChannelHandlerContext ctx, RpcRequest req) { + RpcResponse response = new RpcResponse(); + response.setRequestId(req.getRequestId()); response.setStatus((byte) 0); - String classname = req.getClassName(); String methodName = req.getMethodName(); @@ -95,7 +92,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { result = method.invoke(object, arguments); } catch (Exception e) { - logger.error("netty server execute error", e); + logger.error("netty server execute error,service name {}", classname + methodName, e); response.setStatus((byte) -1); } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java new file mode 100644 index 0000000000..5a10de8d73 --- /dev/null +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.rpc; + +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.rpc.client.IRpcClient; +import org.apache.dolphinscheduler.rpc.client.RpcClient; +import org.apache.dolphinscheduler.rpc.remote.NettyClient; +import org.apache.dolphinscheduler.rpc.remote.NettyServer; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class RpcTest { + private NettyServer nettyServer; + + private IUserService userService; + + private Host host; + + @Before + public void before() throws Exception { + nettyServer = new NettyServer(new NettyServerConfig()); + IRpcClient rpcClient = new RpcClient(); + host = new Host("127.0.0.1", 12346); + userService = rpcClient.create(IUserService.class, host); + + } + + @Test + public void sendTest() { + Integer result = userService.hi(3); + Assert.assertSame(4, result); + result = userService.hi(4); + Assert.assertSame(5, result); + userService.say("sync"); + } + + @After + public void after() { + NettyClient.getInstance().close(); + nettyServer.close(); + } + +} diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserCallback.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserCallback.java new file mode 100644 index 0000000000..882cace115 --- /dev/null +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserCallback.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.rpc; + +import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack; + +/** + * UserCallback + */ +public class UserCallback extends AbstractRpcCallBack { + @Override + public void run(Object object) { + + } +} diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java new file mode 100644 index 0000000000..bd0919ebb9 --- /dev/null +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java @@ -0,0 +1,20 @@ +package org.apache.dolphinscheduler.rpc; + +import org.apache.dolphinscheduler.rpc.base.RpcService; + +/** + * UserService + */ +@RpcService("IUserService") +public class UserService implements IUserService{ + + @Override + public Boolean say(String s) { + return true; + } + + @Override + public Integer hi(int num) { + return ++num; + } +} diff --git a/pom.xml b/pom.xml index af4210dce4..7ab8117f6d 100644 --- a/pom.xml +++ b/pom.xml @@ -868,6 +868,7 @@ **/dao/entity/UdfFuncTest.java **/remote/JsonSerializerTest.java **/remote/RemoveTaskLogResponseCommandTest.java + **/rpc/RpcTest.java **/remote/RemoveTaskLogRequestCommandTest.java **/remote/NettyRemotingClientTest.java **/remote/NettyUtilTest.java