diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/Rpc.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/Rpc.java index 335759b2de..655aa2730e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/Rpc.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/Rpc.java @@ -40,6 +40,8 @@ public @interface Rpc { boolean ack() default false; + boolean callBack() default false; + //todo It is better to set the timeout period for synchronous calls /** diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfig.java index 10d6cd5c4b..331b0fdf59 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfig.java @@ -33,6 +33,8 @@ public class ConsumerConfig { private Boolean async = ConsumerConfigConstants.DEFAULT_SYNC; + private Boolean callBack = ConsumerConfigConstants.DEFAULT_CALL_BACK; + private Integer retries = ConsumerConfigConstants.DEFAULT_RETRIES; public Class getServiceCallBackClass() { @@ -74,4 +76,12 @@ public class ConsumerConfig { void setRetries(Integer retries) { this.retries = retries; } + + public Boolean getCallBack() { + return callBack; + } + + public void setCallBack(Boolean callBack) { + this.callBack = callBack; + } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java index ad7b5d2c09..0c79a81abd 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.rpc.client; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.rpc.base.Rpc; +import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack; import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.protocol.EventType; @@ -102,6 +103,9 @@ public class ConsumerInterceptor { Rpc rpc = method.getAnnotation(Rpc.class); consumerConfig.setAsync(rpc.async()); consumerConfig.setServiceCallBackClass(rpc.serviceCallback()); + if (!rpc.serviceCallback().isInstance(AbstractRpcCallBack.class)) { + consumerConfig.setCallBack(true); + } consumerConfig.setAckCallBackClass(rpc.ackCallback()); consumerConfig.setRetries(rpc.retries()); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java index def8fe10d4..e07c55488c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java @@ -29,4 +29,6 @@ public class ConsumerConfigConstants { public static final Boolean DEFAULT_SYNC = false; public static final Integer DEFAULT_RETRIES = 3; + + public static final Boolean DEFAULT_CALL_BACK = false; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java index 6bc61cff40..e810af2ce7 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java @@ -77,7 +77,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { RpcRequestTable.remove(reqId); future.done(rsp); return; + } + if (Boolean.FALSE.equals(consumerConfig.getCallBack())) { + return; } if (rsp.getStatus() == 0) { diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/IUserService.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/IUserService.java index 66015bae11..7beee00106 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/IUserService.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/IUserService.java @@ -24,8 +24,11 @@ import org.apache.dolphinscheduler.rpc.base.Rpc; */ public interface IUserService { - @Rpc(async = true, serviceCallback = UserCallback.class, retries = 9999) + @Rpc(async = true, serviceCallback = UserCallback.class) Boolean say(String s); Integer hi(int num); + + @Rpc(async = true) + Boolean callBackIsFalse(String s); } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java index bf62e467e9..bd4211ea1a 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java @@ -51,6 +51,8 @@ public class RpcTest { result = userService.hi(4); Assert.assertSame(5, result); userService.say("sync"); + userService.callBackIsFalse("async no call back"); + userService.hi(999999); } @After diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserCallback.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserCallback.java index 882cace115..72c3c705ed 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserCallback.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserCallback.java @@ -19,12 +19,19 @@ package org.apache.dolphinscheduler.rpc; import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * UserCallback */ public class UserCallback extends AbstractRpcCallBack { + + private static final Logger logger = LoggerFactory.getLogger(UserCallback.class); + @Override public void run(Object object) { - + String msg = (String) object; + logger.debug("Kris---------------------------------userCallBack msg is {}", msg); } } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java index ba990d0772..80fb69678a 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java @@ -19,21 +19,34 @@ package org.apache.dolphinscheduler.rpc; import org.apache.dolphinscheduler.rpc.base.RpcService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * UserService */ @RpcService("IUserService") public class UserService implements IUserService { + private static final Logger logger = LoggerFactory.getLogger(UserService.class); + @Override public Boolean say(String s) { + + logger.info("Kris UserService say-------------------------------Synchronous call msg{}", s); return true; } @Override public Integer hi(int num) { - System.out.println("hihihihi+" + num); + logger.info("Kris UserService hi-------------------------------async call msg{}", num); return ++num; } + + @Override + public Boolean callBackIsFalse(String s) { + logger.info("Kris UserService callBackIsFalse-------------------------------async call msg{}", s); + return null; + } }