From 246332b076bdd917d9b7d24ff0dd27f83567f3e6 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 26 Jan 2021 00:43:54 +0800 Subject: [PATCH] fix bug --- .../remote/serialize/ProtoStuffUtils.java | 3 +++ .../rpc/future/RpcFuture.java | 21 +++++++++++++++++-- .../rpc/remote/NettyClient.java | 12 +++++------ 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java index cb04b59749..189b1a4791 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java @@ -62,6 +62,9 @@ public class ProtoStuffUtils { public static T deserialize(byte[] bytes, Class clazz) { Schema schema = getSchema(clazz); T obj = schema.newMessage(); + if(null==obj){ + return null; + } ProtostuffIOUtil.mergeFrom(bytes, obj, schema); return obj; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java index b824688a46..1d40c94795 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.rpc.future; +import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcResponse; import java.util.concurrent.CountDownLatch; @@ -34,6 +35,12 @@ public class RpcFuture implements Future { private RpcResponse response; + private RpcRequest request; + + public RpcFuture(RpcRequest rpcRequest) { + this.request = rpcRequest; + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; @@ -51,13 +58,23 @@ public class RpcFuture implements Future { @Override public RpcResponse get() throws InterruptedException, ExecutionException { - latch.await(5, TimeUnit.SECONDS); + boolean success = latch.await(5, TimeUnit.SECONDS); + if (!success) { + throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId() + + ". Request class name: " + this.request.getClassName() + + ". Request method: " + this.request.getMethodName()); + } return response; } @Override public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - latch.await(timeout, unit); + boolean success = latch.await(timeout, unit); + if (!success) { + throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId() + + ". Request class name: " + this.request.getClassName() + + ". Request method: " + this.request.getMethodName()); + } return response; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java index 936f8249f4..28db8e5f32 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java @@ -18,16 +18,16 @@ package org.apache.dolphinscheduler.rpc.remote; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.apache.dolphinscheduler.rpc.codec.NettyDecoder; -import org.apache.dolphinscheduler.rpc.codec.NettyEncoder; +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.remote.utils.NettyUtils; import org.apache.dolphinscheduler.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.rpc.client.RpcRequestTable; +import org.apache.dolphinscheduler.rpc.codec.NettyDecoder; +import org.apache.dolphinscheduler.rpc.codec.NettyEncoder; import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.future.RpcFuture; -import org.apache.dolphinscheduler.remote.utils.Constants; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.remote.utils.NettyUtils; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; @@ -200,7 +200,7 @@ public class NettyClient { rpcRequestCache.setServiceName(serviceName); RpcFuture future = null; if (!async) { - future = new RpcFuture(); + future = new RpcFuture(request); rpcRequestCache.setRpcFuture(future); } RpcRequestTable.put(request.getRequestId(), rpcRequestCache);