Browse Source

async not needed callback

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
c9daf6a425
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/Rpc.java
  2. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfig.java
  3. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java
  4. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java
  5. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java
  6. 5
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/IUserService.java
  7. 2
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java
  8. 9
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserCallback.java
  9. 15
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/base/Rpc.java

@ -40,6 +40,8 @@ public @interface Rpc {
boolean ack() default false; boolean ack() default false;
boolean callBack() default false;
//todo It is better to set the timeout period for synchronous calls //todo It is better to set the timeout period for synchronous calls
/** /**

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfig.java

@ -33,6 +33,8 @@ public class ConsumerConfig {
private Boolean async = ConsumerConfigConstants.DEFAULT_SYNC; private Boolean async = ConsumerConfigConstants.DEFAULT_SYNC;
private Boolean callBack = ConsumerConfigConstants.DEFAULT_CALL_BACK;
private Integer retries = ConsumerConfigConstants.DEFAULT_RETRIES; private Integer retries = ConsumerConfigConstants.DEFAULT_RETRIES;
public Class<? extends AbstractRpcCallBack> getServiceCallBackClass() { public Class<? extends AbstractRpcCallBack> getServiceCallBackClass() {
@ -74,4 +76,12 @@ public class ConsumerConfig {
void setRetries(Integer retries) { void setRetries(Integer retries) {
this.retries = retries; this.retries = retries;
} }
public Boolean getCallBack() {
return callBack;
}
public void setCallBack(Boolean callBack) {
this.callBack = callBack;
}
} }

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.rpc.client;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.rpc.base.Rpc; import org.apache.dolphinscheduler.rpc.base.Rpc;
import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack;
import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.protocol.EventType; import org.apache.dolphinscheduler.rpc.protocol.EventType;
@ -102,6 +103,9 @@ public class ConsumerInterceptor {
Rpc rpc = method.getAnnotation(Rpc.class); Rpc rpc = method.getAnnotation(Rpc.class);
consumerConfig.setAsync(rpc.async()); consumerConfig.setAsync(rpc.async());
consumerConfig.setServiceCallBackClass(rpc.serviceCallback()); consumerConfig.setServiceCallBackClass(rpc.serviceCallback());
if (!rpc.serviceCallback().isInstance(AbstractRpcCallBack.class)) {
consumerConfig.setCallBack(true);
}
consumerConfig.setAckCallBackClass(rpc.ackCallback()); consumerConfig.setAckCallBackClass(rpc.ackCallback());
consumerConfig.setRetries(rpc.retries()); consumerConfig.setRetries(rpc.retries());
} }

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java

@ -29,4 +29,6 @@ public class ConsumerConfigConstants {
public static final Boolean DEFAULT_SYNC = false; public static final Boolean DEFAULT_SYNC = false;
public static final Integer DEFAULT_RETRIES = 3; public static final Integer DEFAULT_RETRIES = 3;
public static final Boolean DEFAULT_CALL_BACK = false;
} }

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java

@ -77,7 +77,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
RpcRequestTable.remove(reqId); RpcRequestTable.remove(reqId);
future.done(rsp); future.done(rsp);
return; return;
}
if (Boolean.FALSE.equals(consumerConfig.getCallBack())) {
return;
} }
if (rsp.getStatus() == 0) { if (rsp.getStatus() == 0) {

5
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/IUserService.java

@ -24,8 +24,11 @@ import org.apache.dolphinscheduler.rpc.base.Rpc;
*/ */
public interface IUserService { public interface IUserService {
@Rpc(async = true, serviceCallback = UserCallback.class, retries = 9999) @Rpc(async = true, serviceCallback = UserCallback.class)
Boolean say(String s); Boolean say(String s);
Integer hi(int num); Integer hi(int num);
@Rpc(async = true)
Boolean callBackIsFalse(String s);
} }

2
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java

@ -51,6 +51,8 @@ public class RpcTest {
result = userService.hi(4); result = userService.hi(4);
Assert.assertSame(5, result); Assert.assertSame(5, result);
userService.say("sync"); userService.say("sync");
userService.callBackIsFalse("async no call back");
userService.hi(999999);
} }
@After @After

9
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserCallback.java

@ -19,12 +19,19 @@ package org.apache.dolphinscheduler.rpc;
import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack; import org.apache.dolphinscheduler.rpc.common.AbstractRpcCallBack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* UserCallback * UserCallback
*/ */
public class UserCallback extends AbstractRpcCallBack { public class UserCallback extends AbstractRpcCallBack {
private static final Logger logger = LoggerFactory.getLogger(UserCallback.class);
@Override @Override
public void run(Object object) { public void run(Object object) {
String msg = (String) object;
logger.debug("Kris---------------------------------userCallBack msg is {}", msg);
} }
} }

15
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java

@ -19,21 +19,34 @@ package org.apache.dolphinscheduler.rpc;
import org.apache.dolphinscheduler.rpc.base.RpcService; import org.apache.dolphinscheduler.rpc.base.RpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* UserService * UserService
*/ */
@RpcService("IUserService") @RpcService("IUserService")
public class UserService implements IUserService { public class UserService implements IUserService {
private static final Logger logger = LoggerFactory.getLogger(UserService.class);
@Override @Override
public Boolean say(String s) { public Boolean say(String s) {
logger.info("Kris UserService say-------------------------------Synchronous call msg{}", s);
return true; return true;
} }
@Override @Override
public Integer hi(int num) { public Integer hi(int num) {
System.out.println("hihihihi+" + num); logger.info("Kris UserService hi-------------------------------async call msg{}", num);
return ++num; return ++num;
} }
@Override
public Boolean callBackIsFalse(String s) {
logger.info("Kris UserService callBackIsFalse-------------------------------async call msg{}", s);
return null;
}
} }

Loading…
Cancel
Save