Browse Source

add scanFutureTable and testcase (#1971)

pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
5cc2ef5ce6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
  2. 48
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
  3. 58
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/FastJsonSerializerTest.java
  4. 10
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
  5. 60
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java

16
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@ -96,6 +96,11 @@ public class NettyRemotingClient {
*/
private final NettyClientHandler clientHandler;
/**
* response future executor
*/
private final ScheduledExecutorService responseFutureExecutor;
/**
* client init
* @param clientConfig client config
@ -115,6 +120,8 @@ public class NettyRemotingClient {
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
this.start();
}
@ -139,6 +146,12 @@ public class NettyRemotingClient {
encoder);
}
});
this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ResponseFuture.scanFutureTable();
}
}, 5000, 1000, TimeUnit.MILLISECONDS);
//
isStarted.compareAndSet(false, true);
}
@ -306,6 +319,9 @@ public class NettyRemotingClient {
if(callbackExecutor != null){
this.callbackExecutor.shutdownNow();
}
if(this.responseFutureExecutor != null){
this.responseFutureExecutor.shutdownNow();
}
} catch (Exception ex) {
logger.error("netty client close exception", ex);
}

48
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java

@ -18,7 +18,13 @@
package org.apache.dolphinscheduler.remote.future;
import org.apache.dolphinscheduler.remote.command.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
@ -26,6 +32,8 @@ import java.util.concurrent.*;
*/
public class ResponseFuture {
private final static Logger LOGGER = LoggerFactory.getLogger(ResponseFuture.class);
private final static ConcurrentHashMap<Long,ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
/**
@ -161,4 +169,44 @@ public class ResponseFuture {
this.releaseSemaphore.release();
}
}
@Override
public String toString() {
return "ResponseFuture{" +
"opaque=" + opaque +
", timeoutMillis=" + timeoutMillis +
", invokeCallback=" + invokeCallback +
", releaseSemaphore=" + releaseSemaphore +
", latch=" + latch +
", beginTimestamp=" + beginTimestamp +
", responseCommand=" + responseCommand +
", sendOk=" + sendOk +
", cause=" + cause +
'}';
}
/**
* scan future table
*/
public static void scanFutureTable(){
final List<ResponseFuture> futureList = new LinkedList<>();
Iterator<Map.Entry<Long, ResponseFuture>> it = FUTURE_TABLE.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long, ResponseFuture> next = it.next();
ResponseFuture future = next.getValue();
if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
futureList.add(future);
it.remove();
LOGGER.warn("remove timeout request : {}", future);
}
}
for (ResponseFuture future : futureList) {
try {
future.release();
future.executeInvokeCallback();
} catch (Throwable ex) {
LOGGER.warn("scanFutureTable, execute callback error", ex);
}
}
}
}

58
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/FastJsonSerializerTest.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.junit.Assert;
import org.junit.Test;
public class FastJsonSerializerTest {
@Test
public void testSerialize(){
TestObj testObj = new TestObj();
testObj.setAge(12);
byte[] serializeByte = FastJsonSerializer.serialize(testObj);
//
TestObj deserialize = FastJsonSerializer.deserialize(serializeByte, TestObj.class);
Assert.assertEquals(testObj.getAge(), deserialize.getAge());
}
static class TestObj {
private int age;
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "TestObj{" +
"age=" + age +
'}';
}
}
}

10
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java

@ -1,4 +1,4 @@
package org.apache.dolphinscheduler.remote;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -15,9 +15,9 @@ package org.apache.dolphinscheduler.remote;/*
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.Ping;
@ -67,6 +67,8 @@ public class NettyRemotingClientTest {
} catch (Exception e) {
e.printStackTrace();
}
server.close();
client.close();
}
/**
@ -103,5 +105,7 @@ public class NettyRemotingClientTest {
} catch (Exception e) {
e.printStackTrace();
}
server.close();
client.close();
}
}

60
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ResponseFutureTest {
@Test
public void testScanFutureTable(){
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("executor-service"));
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ResponseFuture.scanFutureTable();
}
}, 3000, 1000, TimeUnit.MILLISECONDS);
CountDownLatch latch = new CountDownLatch(1);
InvokeCallback invokeCallback = new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
latch.countDown();
}
};
ResponseFuture future = new ResponseFuture(1, 2000, invokeCallback, null);
try {
latch.await(5000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ResponseFuture.getFuture(1) == null);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdownNow();
}
}
Loading…
Cancel
Save