From 5cc2ef5ce63b28f2edc5b5f58997daa6a510e3a1 Mon Sep 17 00:00:00 2001 From: Tboy Date: Mon, 17 Feb 2020 22:07:19 +0800 Subject: [PATCH] add scanFutureTable and testcase (#1971) --- .../remote/NettyRemotingClient.java | 16 +++++ .../remote/future/ResponseFuture.java | 48 +++++++++++++++ .../remote/FastJsonSerializerTest.java | 58 ++++++++++++++++++ .../remote/NettyRemotingClientTest.java | 10 +++- .../remote/ResponseFutureTest.java | 60 +++++++++++++++++++ 5 files changed, 189 insertions(+), 3 deletions(-) create mode 100644 dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/FastJsonSerializerTest.java create mode 100644 dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 357fd6d19d..96258d752a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -96,6 +96,11 @@ public class NettyRemotingClient { */ private final NettyClientHandler clientHandler; + /** + * response future executor + */ + private final ScheduledExecutorService responseFutureExecutor; + /** * client init * @param clientConfig client config @@ -115,6 +120,8 @@ public class NettyRemotingClient { new CallerThreadExecutePolicy()); this.clientHandler = new NettyClientHandler(this, callbackExecutor); + this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor")); + this.start(); } @@ -139,6 +146,12 @@ public class NettyRemotingClient { encoder); } }); + this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + ResponseFuture.scanFutureTable(); + } + }, 5000, 1000, TimeUnit.MILLISECONDS); // isStarted.compareAndSet(false, true); } @@ -306,6 +319,9 @@ public class NettyRemotingClient { if(callbackExecutor != null){ this.callbackExecutor.shutdownNow(); } + if(this.responseFutureExecutor != null){ + this.responseFutureExecutor.shutdownNow(); + } } catch (Exception ex) { logger.error("netty client close exception", ex); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java index caff34236e..ca304646e4 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java @@ -18,7 +18,13 @@ package org.apache.dolphinscheduler.remote.future; import org.apache.dolphinscheduler.remote.command.Command; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.*; /** @@ -26,6 +32,8 @@ import java.util.concurrent.*; */ public class ResponseFuture { + private final static Logger LOGGER = LoggerFactory.getLogger(ResponseFuture.class); + private final static ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256); /** @@ -161,4 +169,44 @@ public class ResponseFuture { this.releaseSemaphore.release(); } } + + @Override + public String toString() { + return "ResponseFuture{" + + "opaque=" + opaque + + ", timeoutMillis=" + timeoutMillis + + ", invokeCallback=" + invokeCallback + + ", releaseSemaphore=" + releaseSemaphore + + ", latch=" + latch + + ", beginTimestamp=" + beginTimestamp + + ", responseCommand=" + responseCommand + + ", sendOk=" + sendOk + + ", cause=" + cause + + '}'; + } + + /** + * scan future table + */ + public static void scanFutureTable(){ + final List futureList = new LinkedList<>(); + Iterator> it = FUTURE_TABLE.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + ResponseFuture future = next.getValue(); + if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { + futureList.add(future); + it.remove(); + LOGGER.warn("remove timeout request : {}", future); + } + } + for (ResponseFuture future : futureList) { + try { + future.release(); + future.executeInvokeCallback(); + } catch (Throwable ex) { + LOGGER.warn("scanFutureTable, execute callback error", ex); + } + } + } } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/FastJsonSerializerTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/FastJsonSerializerTest.java new file mode 100644 index 0000000000..97166cca70 --- /dev/null +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/FastJsonSerializerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote; + + +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.junit.Assert; +import org.junit.Test; + +public class FastJsonSerializerTest { + + @Test + public void testSerialize(){ + TestObj testObj = new TestObj(); + testObj.setAge(12); + byte[] serializeByte = FastJsonSerializer.serialize(testObj); + + // + TestObj deserialize = FastJsonSerializer.deserialize(serializeByte, TestObj.class); + + Assert.assertEquals(testObj.getAge(), deserialize.getAge()); + } + + static class TestObj { + + private int age; + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + @Override + public String toString() { + return "TestObj{" + + "age=" + age + + '}'; + } + } +} diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java index b6f8e2a8de..ef46c2c781 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java @@ -1,4 +1,4 @@ -package org.apache.dolphinscheduler.remote;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,9 +15,9 @@ package org.apache.dolphinscheduler.remote;/* * limitations under the License. */ +package org.apache.dolphinscheduler.remote; + import io.netty.channel.Channel; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.Ping; @@ -67,6 +67,8 @@ public class NettyRemotingClientTest { } catch (Exception e) { e.printStackTrace(); } + server.close(); + client.close(); } /** @@ -103,5 +105,7 @@ public class NettyRemotingClientTest { } catch (Exception e) { e.printStackTrace(); } + server.close(); + client.close(); } } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java new file mode 100644 index 0000000000..8836043257 --- /dev/null +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote; + + +import org.apache.dolphinscheduler.remote.future.InvokeCallback; +import org.apache.dolphinscheduler.remote.future.ResponseFuture; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class ResponseFutureTest { + + @Test + public void testScanFutureTable(){ + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("executor-service")); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + ResponseFuture.scanFutureTable(); + } + }, 3000, 1000, TimeUnit.MILLISECONDS); + + CountDownLatch latch = new CountDownLatch(1); + InvokeCallback invokeCallback = new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + latch.countDown(); + } + }; + ResponseFuture future = new ResponseFuture(1, 2000, invokeCallback, null); + try { + latch.await(5000, TimeUnit.MILLISECONDS); + Assert.assertTrue(ResponseFuture.getFuture(1) == null); + } catch (InterruptedException e) { + e.printStackTrace(); + } + executorService.shutdownNow(); + } +}