From ac932ba2cfbeb1c962b45dedcb308a256c4dafea Mon Sep 17 00:00:00 2001 From: Tboy Date: Sun, 16 Feb 2020 11:53:29 +0800 Subject: [PATCH] add sendAsync method (#1962) * add sendAsync method * add sendAsync method --- .../remote/NettyRemotingClient.java | 135 ++++++++++-------- .../exceptions/RemotingTimeoutException.java | 21 ++- .../RemotingTooMuchRequestException.java | 24 ++++ .../remote/future/InvokeCallback.java | 18 ++- .../remote/future/ReleaseSemaphore.java | 41 ++++++ .../remote/future/ResponseFuture.java | 39 ++++- .../remote/handler/NettyClientHandler.java | 21 ++- .../utils/CallerThreadExecutePolicy.java | 38 +++++ .../remote/utils/NamedThreadFactory.java | 47 ++++++ .../remote/NettyRemotingClientTest.java | 39 ++++- 10 files changed, 349 insertions(+), 74 deletions(-) create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ReleaseSemaphore.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/CallerThreadExecutePolicy.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 3dcb8c0825..e07cfd6404 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -28,20 +28,19 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException; +import org.apache.dolphinscheduler.remote.exceptions.RemotingTooMuchRequestException; import org.apache.dolphinscheduler.remote.future.InvokeCallback; +import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore; import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.handler.NettyClientHandler; import org.apache.dolphinscheduler.remote.utils.Address; -import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.rmi.RemoteException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -56,18 +55,20 @@ public class NettyRemotingClient { private final NettyEncoder encoder = new NettyEncoder(); - private final ConcurrentHashMap channels = new ConcurrentHashMap(); - - private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); + private final ConcurrentHashMap channels = new ConcurrentHashMap(128); private final AtomicBoolean isStarted = new AtomicBoolean(false); private final NioEventLoopGroup workerGroup; - private final NettyClientHandler clientHandler = new NettyClientHandler(this); - private final NettyClientConfig clientConfig; + private final Semaphore asyncSemaphore = new Semaphore(200, true); + + private final ExecutorService callbackExecutor; + + private final NettyClientHandler clientHandler; + public NettyRemotingClient(final NettyClientConfig clientConfig){ this.clientConfig = clientConfig; this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { @@ -78,6 +79,10 @@ public class NettyRemotingClient { return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); } }); + this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, + new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), new CallerThreadExecutePolicy()); + this.clientHandler = new NettyClientHandler(this, callbackExecutor); + this.start(); } @@ -103,65 +108,79 @@ public class NettyRemotingClient { isStarted.compareAndSet(false, true); } - //TODO - public void send(final Address address, final Command command, final InvokeCallback invokeCallback) throws RemotingException { + public void sendAsync(final Address address, final Command command, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingException { final Channel channel = getChannel(address); if (channel == null) { throw new RemotingException("network error"); } - try { - channel.writeAndFlush(command).addListener(new ChannelFutureListener(){ - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if(future.isSuccess()){ - logger.info("sent command {} to {}", command, address); - } else{ - logger.error("send command {} to {} failed, error {}", command, address, future.cause()); + final long opaque = command.getOpaque(); + boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + if(acquired){ + final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore); + final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, releaseSemaphore); + try { + channel.writeAndFlush(command).addListener(new ChannelFutureListener(){ + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if(future.isSuccess()){ + responseFuture.setSendOk(true); + return; + } else { + responseFuture.setSendOk(false); + } + responseFuture.setCause(future.cause()); + responseFuture.putResponse(null); + try { + responseFuture.executeInvokeCallback(); + } catch (Throwable ex){ + logger.error("execute callback error", ex); + } finally{ + responseFuture.release(); + } } - } - }); - } catch (Exception ex) { - String msg = String.format("send command %s to address %s encounter error", command, address); - throw new RemotingException(msg, ex); + }); + } catch (Throwable ex){ + responseFuture.release(); + throw new RemotingException(String.format("send command to address: %s failed", address), ex); + } + } else{ + String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", + timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); + throw new RemotingTooMuchRequestException(message); } } - public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException { + public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException { final Channel channel = getChannel(address); if (channel == null) { throw new RemotingException(String.format("connect to : %s fail", address)); } final long opaque = command.getOpaque(); - try { - final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null); - channel.writeAndFlush(command).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if(channelFuture.isSuccess()){ - responseFuture.setSendOk(true); - return; - } else{ - responseFuture.setSendOk(false); - responseFuture.setCause(channelFuture.cause()); - responseFuture.putResponse(null); - logger.error("send command {} to address {} failed", command, address); - } - } - }); - Command result = responseFuture.waitResponse(); - if(result == null){ - if(responseFuture.isSendOK()){ - throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause()); - } else{ - throw new RemoteException(address.toString(), responseFuture.getCause()); + final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); + channel.writeAndFlush(command).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if(future.isSuccess()){ + responseFuture.setSendOk(true); + return; + } else { + responseFuture.setSendOk(false); } + responseFuture.setCause(future.cause()); + responseFuture.putResponse(null); + logger.error("send command {} to address {} failed", command, address); + } + }); + Command result = responseFuture.waitResponse(); + if(result == null){ + if(responseFuture.isSendOK()){ + throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause()); + } else{ + throw new RemotingException(address.toString(), responseFuture.getCause()); } - return result; - } catch (Exception ex) { - String msg = String.format("send command %s to address %s error", command, address); - throw new RemotingException(msg, ex); } + return result; } public Channel getChannel(Address address) { @@ -192,10 +211,6 @@ public class NettyRemotingClient { return null; } - public ExecutorService getDefaultExecutor() { - return defaultExecutor; - } - public void close() { if(isStarted.compareAndSet(true, false)){ try { @@ -203,8 +218,8 @@ public class NettyRemotingClient { if(workerGroup != null){ this.workerGroup.shutdownGracefully(); } - if(defaultExecutor != null){ - defaultExecutor.shutdown(); + if(callbackExecutor != null){ + this.callbackExecutor.shutdownNow(); } } catch (Exception ex) { logger.error("netty client close exception", ex); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java index c0d986b063..aaf9170781 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java @@ -1,8 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.exceptions; -/** - * @Author: Tboy - */ + public class RemotingTimeoutException extends RemotingException{ public RemotingTimeoutException(String message) { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java new file mode 100644 index 0000000000..5ee11a04a7 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.remote.exceptions; + +public class RemotingTooMuchRequestException extends RemotingException{ + + public RemotingTooMuchRequestException(String message) { + super(message); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java index 6ad6a7cbee..7cf875b002 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java @@ -1,7 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dolphinscheduler.remote.future; /** - * @Author: Tboy + * invoke callback */ public interface InvokeCallback { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ReleaseSemaphore.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ReleaseSemaphore.java new file mode 100644 index 0000000000..95a04b1f1a --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ReleaseSemaphore.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.remote.future; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * release semaphore + */ +public class ReleaseSemaphore { + + private final Semaphore semaphore; + + private final AtomicBoolean released; + + public ReleaseSemaphore(Semaphore semaphore){ + this.semaphore = semaphore; + this.released = new AtomicBoolean(false); + } + + public void release(){ + if(this.released.compareAndSet(false, true)){ + this.semaphore.release(); + } + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java index ba2c7e36b8..a9bdb39adf 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java @@ -1,13 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.future; import org.apache.dolphinscheduler.remote.command.Command; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** - * @Author: Tboy + * response future */ public class ResponseFuture { @@ -19,6 +34,8 @@ public class ResponseFuture { private final InvokeCallback invokeCallback; + private final ReleaseSemaphore releaseSemaphore; + private final CountDownLatch latch = new CountDownLatch(1); private final long beginTimestamp = System.currentTimeMillis(); @@ -29,11 +46,11 @@ public class ResponseFuture { private volatile Throwable cause; - - public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback) { + public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback, ReleaseSemaphore releaseSemaphore) { this.opaque = opaque; this.timeoutMillis = timeoutMillis; this.invokeCallback = invokeCallback; + this.releaseSemaphore = releaseSemaphore; FUTURE_TABLE.put(opaque, this); } @@ -95,7 +112,17 @@ public class ResponseFuture { return responseCommand; } + public void setResponseCommand(Command responseCommand) { + this.responseCommand = responseCommand; + } + public InvokeCallback getInvokeCallback() { return invokeCallback; } + + public void release() { + if(this.releaseSemaphore != null){ + this.releaseSemaphore.release(); + } + } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index 572957f9ab..97f6632fb3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; + /** * netty client request handler */ @@ -34,8 +36,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final NettyRemotingClient nettyRemotingClient; - public NettyClientHandler(NettyRemotingClient nettyRemotingClient){ + private final ExecutorService callbackExecutor; + + public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){ this.nettyRemotingClient = nettyRemotingClient; + this.callbackExecutor = callbackExecutor; } @Override @@ -52,8 +57,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private void processReceived(final Command responseCommand) { ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque()); if(future != null){ - future.putResponse(responseCommand); - future.executeInvokeCallback(); + future.setResponseCommand(responseCommand); + future.release(); + if(future.getInvokeCallback() != null){ + this.callbackExecutor.submit(new Runnable() { + @Override + public void run() { + future.executeInvokeCallback(); + } + }); + } else{ + future.putResponse(responseCommand); + } } else{ logger.warn("receive response {}, but not matched any request ", responseCommand); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/CallerThreadExecutePolicy.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/CallerThreadExecutePolicy.java new file mode 100644 index 0000000000..048ea86acb --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/CallerThreadExecutePolicy.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * caller thread execute + */ +public class CallerThreadExecutePolicy implements RejectedExecutionHandler { + + private final Logger logger = LoggerFactory.getLogger(CallerThreadExecutePolicy.class); + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + logger.warn("queue is full, trigger caller thread execute"); + r.run(); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java new file mode 100644 index 0000000000..bef64c7dc1 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.remote.utils; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NamedThreadFactory implements ThreadFactory { + + private final AtomicInteger increment = new AtomicInteger(1); + + private final String name; + + private final int count; + + public NamedThreadFactory(String name){ + this(name, 0); + } + + public NamedThreadFactory(String name, int count){ + this.name = name; + this.count = count; + } + + @Override + public Thread newThread(Runnable r) { + final String threadName = count > 0 ? String.format(name + "_%d_%d", count, increment.getAndIncrement()) + : String.format(name + "_%d", increment.getAndIncrement()); + Thread t = new Thread(r, threadName); + t.setDaemon(true); + return t; + } +} diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java index 15c556a2ca..732a995eb3 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java @@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.remote.command.Ping; import org.apache.dolphinscheduler.remote.command.Pong; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.remote.future.InvokeCallback; +import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Address; import org.junit.Assert; @@ -36,7 +38,7 @@ public class NettyRemotingClientTest { @Test - public void testSend(){ + public void testSendSync(){ NettyServerConfig serverConfig = new NettyServerConfig(); NettyRemotingServer server = new NettyRemotingServer(serverConfig); @@ -46,6 +48,8 @@ public class NettyRemotingClientTest { channel.writeAndFlush(Pong.create(command.getOpaque())); } }); + + server.start(); // final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -58,4 +62,37 @@ public class NettyRemotingClientTest { e.printStackTrace(); } } + + @Test + public void testSendAsync(){ + NettyServerConfig serverConfig = new NettyServerConfig(); + + NettyRemotingServer server = new NettyRemotingServer(serverConfig); + server.registerProcessor(CommandType.PING, new NettyRequestProcessor() { + @Override + public void process(Channel channel, Command command) { + channel.writeAndFlush(Pong.create(command.getOpaque())); + } + }); + server.start(); + // + final NettyClientConfig clientConfig = new NettyClientConfig(); + NettyRemotingClient client = new NettyRemotingClient(clientConfig); + CountDownLatch latch = new CountDownLatch(1); + Command commandPing = Ping.create(); + try { + final AtomicLong opaque = new AtomicLong(0); + client.sendAsync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + opaque.set(responseFuture.getOpaque()); + latch.countDown(); + } + }); + latch.await(); + Assert.assertEquals(commandPing.getOpaque(), opaque.get()); + } catch (Exception e) { + e.printStackTrace(); + } + } }