Browse Source

fix bug

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
246332b076
  1. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java
  2. 21
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java
  3. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java

@ -62,6 +62,9 @@ public class ProtoStuffUtils {
public static <T> T deserialize(byte[] bytes, Class<T> clazz) { public static <T> T deserialize(byte[] bytes, Class<T> clazz) {
Schema<T> schema = getSchema(clazz); Schema<T> schema = getSchema(clazz);
T obj = schema.newMessage(); T obj = schema.newMessage();
if(null==obj){
return null;
}
ProtostuffIOUtil.mergeFrom(bytes, obj, schema); ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj; return obj;
} }

21
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.rpc.future; package org.apache.dolphinscheduler.rpc.future;
import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -34,6 +35,12 @@ public class RpcFuture implements Future<Object> {
private RpcResponse response; private RpcResponse response;
private RpcRequest request;
public RpcFuture(RpcRequest rpcRequest) {
this.request = rpcRequest;
}
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
return false; return false;
@ -51,13 +58,23 @@ public class RpcFuture implements Future<Object> {
@Override @Override
public RpcResponse get() throws InterruptedException, ExecutionException { public RpcResponse get() throws InterruptedException, ExecutionException {
latch.await(5, TimeUnit.SECONDS); boolean success = latch.await(5, TimeUnit.SECONDS);
if (!success) {
throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId()
+ ". Request class name: " + this.request.getClassName()
+ ". Request method: " + this.request.getMethodName());
}
return response; return response;
} }
@Override @Override
public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
latch.await(timeout, unit); boolean success = latch.await(timeout, unit);
if (!success) {
throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId()
+ ". Request class name: " + this.request.getClassName()
+ ". Request method: " + this.request.getMethodName());
}
return response; return response;
} }

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java

@ -18,16 +18,16 @@
package org.apache.dolphinscheduler.rpc.remote; package org.apache.dolphinscheduler.rpc.remote;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.rpc.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.rpc.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.apache.dolphinscheduler.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.rpc.client.RpcRequestTable; import org.apache.dolphinscheduler.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.rpc.codec.NettyDecoder;
import org.apache.dolphinscheduler.rpc.codec.NettyEncoder;
import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.future.RpcFuture; import org.apache.dolphinscheduler.rpc.future.RpcFuture;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -200,7 +200,7 @@ public class NettyClient {
rpcRequestCache.setServiceName(serviceName); rpcRequestCache.setServiceName(serviceName);
RpcFuture future = null; RpcFuture future = null;
if (!async) { if (!async) {
future = new RpcFuture(); future = new RpcFuture(request);
rpcRequestCache.setRpcFuture(future); rpcRequestCache.setRpcFuture(future);
} }
RpcRequestTable.put(request.getRequestId(), rpcRequestCache); RpcRequestTable.put(request.getRequestId(), rpcRequestCache);

Loading…
Cancel
Save