From 99d8276be7ff1f0cb47f6d67d7440a9380614073 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 8 May 2024 11:46:09 +0800 Subject: [PATCH] Optimizing the scope of RPC base classes (#15946) * Optimizing the scope of RPC base classes * Fix UT --- .../alert/rpc/AlertRpcServer.java | 16 +-- .../alert/rpc/AlertRpcServerTest.java | 28 ++--- .../api/service/LoggerServiceTest.java | 19 ++- ....java => AbstractClientMethodInvoker.java} | 5 +- .../base/client/ClientInvocationHandler.java | 5 +- .../base/client/ClientMethodInvoker.java | 2 +- .../base/client/IRpcClientProxyFactory.java | 2 +- .../JdkDynamicRpcClientProxyFactory.java | 5 +- .../base/{ => client}/NettyClientHandler.java | 18 +-- .../{ => client}/NettyRemotingClient.java | 110 ++---------------- .../NettyRemotingClientFactory.java | 2 +- ...gletonJdkDynamicRpcClientProxyFactory.java | 1 - .../base/client/SyncClientMethodInvoker.java | 5 +- .../extract/base/future/ResponseFuture.java | 76 +----------- .../base/server/JdkDynamicServerHandler.java | 12 +- .../{ => server}/NettyRemotingServer.java | 57 +++++---- .../NettyRemotingServerFactory.java | 6 +- .../extract/base/server/RpcServer.java | 74 ++++++++++++ .../base/server/ServerMethodInvoker.java | 4 +- .../base/server/ServerMethodInvokerImpl.java | 9 +- .../server/ServerMethodInvokerRegistry.java | 28 +++++ .../SpringServerMethodInvokerDiscovery.java | 37 ++---- ...onJdkDynamicRpcClientProxyFactoryTest.java | 12 +- .../server/master/rpc/MasterRpcServer.java | 18 +-- .../master/rpc/MasterRpcServerTest.java | 38 ++++++ .../microbench/rpc/RpcBenchMarkTest.java | 14 +-- .../server/worker/rpc/WorkerRpcServer.java | 18 +-- .../worker/rpc/WorkerRpcServerTest.java | 39 +++++++ 28 files changed, 301 insertions(+), 359 deletions(-) rename dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java => dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java (61%) rename dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/{BaseRemoteMethodInvoker.java => AbstractClientMethodInvoker.java} (83%) rename dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/{ => client}/NettyClientHandler.java (87%) rename dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/{ => client}/NettyRemotingClient.java (62%) rename dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/{ => client}/NettyRemotingClientFactory.java (95%) rename dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/{ => server}/NettyRemotingServer.java (75%) rename dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/{ => server}/NettyRemotingServerFactory.java (84%) create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java create mode 100644 dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java index 3bd368573a..d73e4755dd 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.alert.rpc; import org.apache.dolphinscheduler.alert.config.AlertConfig; -import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; @@ -31,20 +30,7 @@ import org.springframework.stereotype.Service; public class AlertRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable { public AlertRpcServer(AlertConfig alertConfig) { - super(NettyRemotingServerFactory.buildNettyRemotingServer( - NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build())); + super(NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build()); } - public void start() { - log.info("Starting AlertRpcServer..."); - nettyRemotingServer.start(); - log.info("Started AlertRpcServer..."); - } - - @Override - public void close() { - log.info("Closing AlertRpcServer..."); - nettyRemotingServer.close(); - log.info("Closed AlertRpcServer..."); - } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java similarity index 61% rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java index df88de34e3..75f16848fd 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java @@ -15,22 +15,24 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.processor; +package org.apache.dolphinscheduler.alert.rpc; -import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator; +import org.apache.dolphinscheduler.alert.config.AlertConfig; -import org.mockito.Mockito; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; +import org.junit.jupiter.api.Test; -/** - * dependency config - */ -@Configuration -public class TaskResponseProcessorTestConfig { +class AlertRpcServerTest { + + private final AlertRpcServer alertRpcServer = new AlertRpcServer(new AlertConfig()); - @Bean - public DataQualityResultOperator dataQualityResultOperator() { - return Mockito.mock(DataQualityResultOperator.class); + @Test + void testStart() { + alertRpcServer.start(); } + + @Test + void testClose() { + alertRpcServer.close(); + } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index 4861e1004e..972092602f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; -import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; import org.apache.dolphinscheduler.extract.common.ILogService; @@ -91,7 +90,7 @@ public class LoggerServiceTest { @Mock private TaskDefinitionMapper taskDefinitionMapper; - private NettyRemotingServer nettyRemotingServer; + private SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery; private int nettyServerPort = 18080; @@ -103,11 +102,10 @@ public class LoggerServiceTest { return; } - nettyRemotingServer = new NettyRemotingServer(NettyServerConfig.builder().listenPort(nettyServerPort).build()); - nettyRemotingServer.start(); - SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery = - new SpringServerMethodInvokerDiscovery(nettyRemotingServer); - springServerMethodInvokerDiscovery.postProcessAfterInitialization(new ILogService() { + springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery( + NettyServerConfig.builder().serverName("TestLogServer").listenPort(nettyServerPort).build()); + springServerMethodInvokerDiscovery.start(); + springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new ILogService() { @Override public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest) { @@ -142,13 +140,14 @@ public class LoggerServiceTest { public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) { } - }, "iLogServiceImpl"); + }); + springServerMethodInvokerDiscovery.start(); } @AfterEach public void tearDown() { - if (nettyRemotingServer != null) { - nettyRemotingServer.close(); + if (springServerMethodInvokerDiscovery != null) { + springServerMethodInvokerDiscovery.close(); } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java similarity index 83% rename from dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java rename to dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java index 519dd87199..b753f1efa7 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java @@ -17,12 +17,11 @@ package org.apache.dolphinscheduler.extract.base.client; -import org.apache.dolphinscheduler.extract.base.NettyRemotingClient; import org.apache.dolphinscheduler.extract.base.utils.Host; import java.lang.reflect.Method; -public abstract class BaseRemoteMethodInvoker implements ClientMethodInvoker { +abstract class AbstractClientMethodInvoker implements ClientMethodInvoker { protected final String methodIdentifier; @@ -32,7 +31,7 @@ public abstract class BaseRemoteMethodInvoker implements ClientMethodInvoker { protected final Host serverHost; - public BaseRemoteMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) { + AbstractClientMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) { this.serverHost = serverHost; this.localMethod = localMethod; this.nettyRemotingClient = nettyRemotingClient; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java index d5c9ab73d3..41ec3e056d 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.extract.base.client; import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.dolphinscheduler.extract.base.NettyRemotingClient; import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.utils.Host; @@ -31,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; @Slf4j -public class ClientInvocationHandler implements InvocationHandler { +class ClientInvocationHandler implements InvocationHandler { private final NettyRemotingClient nettyRemotingClient; @@ -39,7 +38,7 @@ public class ClientInvocationHandler implements InvocationHandler { private final Host serverHost; - public ClientInvocationHandler(Host serverHost, NettyRemotingClient nettyRemotingClient) { + ClientInvocationHandler(Host serverHost, NettyRemotingClient nettyRemotingClient) { this.serverHost = checkNotNull(serverHost); this.nettyRemotingClient = checkNotNull(nettyRemotingClient); this.methodInvokerMap = new ConcurrentHashMap<>(); diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java index dcf53b0311..a287fd95ce 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.extract.base.client; import java.lang.reflect.Method; -public interface ClientMethodInvoker { +interface ClientMethodInvoker { Object invoke(Object proxy, Method method, Object[] args) throws Throwable; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java index e60b0f18b0..afd3adf348 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.extract.base.client; -public interface IRpcClientProxyFactory { +interface IRpcClientProxyFactory { /** * Create the client proxy. diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java index 5635a88f34..bf329ab3fc 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.extract.base.client; -import org.apache.dolphinscheduler.extract.base.NettyRemotingClient; import org.apache.dolphinscheduler.extract.base.utils.Host; import java.lang.reflect.Proxy; @@ -34,7 +33,7 @@ import com.google.common.cache.LoadingCache; /** * This class is used to create a proxy client which will transform local method invocation to remove invocation. */ -public class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory { +class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory { private final NettyRemotingClient nettyRemotingClient; @@ -49,7 +48,7 @@ public class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory { } }); - public JdkDynamicRpcClientProxyFactory(NettyRemotingClient nettyRemotingClient) { + JdkDynamicRpcClientProxyFactory(NettyRemotingClient nettyRemotingClient) { this.nettyRemotingClient = nettyRemotingClient; } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java similarity index 87% rename from dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java rename to dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java index b0d998af83..be570eb577 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java @@ -15,16 +15,15 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.base; +package org.apache.dolphinscheduler.extract.base.client; +import org.apache.dolphinscheduler.extract.base.StandardRpcResponse; import org.apache.dolphinscheduler.extract.base.future.ResponseFuture; import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter; import org.apache.dolphinscheduler.extract.base.protocal.Transporter; import org.apache.dolphinscheduler.extract.base.serialize.JsonSerializer; import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils; -import java.util.concurrent.ExecutorService; - import lombok.extern.slf4j.Slf4j; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -38,11 +37,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final NettyRemotingClient nettyRemotingClient; - private final ExecutorService callbackExecutor; - - public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) { + public NettyClientHandler(NettyRemotingClient nettyRemotingClient) { this.nettyRemotingClient = nettyRemotingClient; - this.callbackExecutor = callbackExecutor; } @Override @@ -64,13 +60,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } StandardRpcResponse deserialize = JsonSerializer.deserialize(transporter.getBody(), StandardRpcResponse.class); future.setIRpcResponse(deserialize); - future.release(); - if (future.getInvokeCallback() != null) { - future.removeFuture(); - this.callbackExecutor.execute(future::executeInvokeCallback); - } else { - future.putResponse(deserialize); - } + future.putResponse(deserialize); } @Override diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java similarity index 62% rename from dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java rename to dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java index e4682f5224..3999f5c9f5 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java @@ -15,33 +15,24 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.base; +package org.apache.dolphinscheduler.extract.base.client; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.extract.base.IRpcResponse; import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig; import org.apache.dolphinscheduler.extract.base.exception.RemotingException; import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException; -import org.apache.dolphinscheduler.extract.base.exception.RemotingTooMuchRequestException; -import org.apache.dolphinscheduler.extract.base.future.InvokeCallback; -import org.apache.dolphinscheduler.extract.base.future.ReleaseSemaphore; import org.apache.dolphinscheduler.extract.base.future.ResponseFuture; import org.apache.dolphinscheduler.extract.base.protocal.Transporter; import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder; import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder; -import org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy; import org.apache.dolphinscheduler.extract.base.utils.Constants; import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.extract.base.utils.NettyUtils; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,14 +62,8 @@ public class NettyRemotingClient implements AutoCloseable { private final NettyClientConfig clientConfig; - private final Semaphore asyncSemaphore = new Semaphore(1024, true); - - private final ExecutorService callbackExecutor; - private final NettyClientHandler clientHandler; - private final ScheduledExecutorService responseFutureExecutor; - public NettyRemotingClient(final NettyClientConfig clientConfig) { this.clientConfig = clientConfig; ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-"); @@ -87,18 +72,7 @@ public class NettyRemotingClient implements AutoCloseable { } else { this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory); } - this.callbackExecutor = new ThreadPoolExecutor( - Constants.CPUS, - Constants.CPUS, - 1, - TimeUnit.MINUTES, - new LinkedBlockingQueue<>(1000), - ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"), - new CallerThreadExecutePolicy()); - this.clientHandler = new NettyClientHandler(this, callbackExecutor); - - this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor( - ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-")); + this.clientHandler = new NettyClientHandler(this); this.start(); } @@ -127,66 +101,9 @@ public class NettyRemotingClient implements AutoCloseable { .addLast(new TransporterDecoder(), clientHandler, new TransporterEncoder()); } }); - this.responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable, 0, 1, TimeUnit.SECONDS); isStarted.compareAndSet(false, true); } - public void sendAsync(final Host host, - final Transporter transporter, - final long timeoutMillis, - final InvokeCallback invokeCallback) throws InterruptedException, RemotingException { - final Channel channel = getChannel(host); - if (channel == null) { - throw new RemotingException("network error"); - } - /* - * request unique identification - */ - final long opaque = transporter.getHeader().getOpaque(); - /* - * control concurrency number - */ - boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); - if (acquired) { - final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore); - - /* - * response future - */ - final ResponseFuture responseFuture = new ResponseFuture(opaque, - timeoutMillis, - invokeCallback, - releaseSemaphore); - try { - channel.writeAndFlush(transporter).addListener(future -> { - if (future.isSuccess()) { - responseFuture.setSendOk(true); - return; - } else { - responseFuture.setSendOk(false); - } - responseFuture.setCause(future.cause()); - responseFuture.putResponse(null); - try { - responseFuture.executeInvokeCallback(); - } catch (Exception ex) { - log.error("execute callback error", ex); - } finally { - responseFuture.release(); - } - }); - } catch (Exception ex) { - responseFuture.release(); - throw new RemotingException(String.format("Send transporter to host: %s failed", host), ex); - } - } else { - String message = String.format( - "try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", - timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); - throw new RemotingTooMuchRequestException(message); - } - } - public IRpcResponse sendSync(final Host host, final Transporter transporter, final long timeoutMillis) throws InterruptedException, RemotingException { final Channel channel = getChannel(host); @@ -194,7 +111,7 @@ public class NettyRemotingClient implements AutoCloseable { throw new RemotingException(String.format("connect to : %s fail", host)); } final long opaque = transporter.getHeader().getOpaque(); - final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); + final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis); channel.writeAndFlush(transporter).addListener(future -> { if (future.isSuccess()) { responseFuture.setSendOk(true); @@ -220,7 +137,7 @@ public class NettyRemotingClient implements AutoCloseable { return iRpcResponse; } - public Channel getChannel(Host host) { + private Channel getChannel(Host host) { Channel channel = channels.get(host); if (channel != null && channel.isActive()) { return channel; @@ -235,9 +152,9 @@ public class NettyRemotingClient implements AutoCloseable { * @param isSync sync flag * @return channel */ - public Channel createChannel(Host host, boolean isSync) { - ChannelFuture future; + private Channel createChannel(Host host, boolean isSync) { try { + ChannelFuture future; synchronized (bootstrap) { future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort())); } @@ -249,10 +166,11 @@ public class NettyRemotingClient implements AutoCloseable { channels.put(host, channel); return channel; } - } catch (Exception ex) { - log.warn(String.format("connect to %s error", host), ex); + throw new IllegalArgumentException("connect to host: " + host + " failed"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Connect to host: " + host + " failed", e); } - return null; } @Override @@ -263,12 +181,6 @@ public class NettyRemotingClient implements AutoCloseable { if (workerGroup != null) { this.workerGroup.shutdownGracefully(); } - if (callbackExecutor != null) { - this.callbackExecutor.shutdownNow(); - } - if (this.responseFutureExecutor != null) { - this.responseFutureExecutor.shutdownNow(); - } log.info("netty client closed"); } catch (Exception ex) { log.error("netty client close exception", ex); diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java similarity index 95% rename from dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java rename to dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java index 7bbebfbf3d..d14a8aa54e 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.base; +package org.apache.dolphinscheduler.extract.base.client; import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java index 28d82532be..44d310e70b 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.extract.base.client; -import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory; import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig; public class SingletonJdkDynamicRpcClientProxyFactory { diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java index b5fdf3fb71..4731a22d0a 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.extract.base.client; import org.apache.dolphinscheduler.extract.base.IRpcResponse; -import org.apache.dolphinscheduler.extract.base.NettyRemotingClient; import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.StandardRpcRequest; import org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException; @@ -29,9 +28,9 @@ import org.apache.dolphinscheduler.extract.base.utils.Host; import java.lang.reflect.Method; -public class SyncClientMethodInvoker extends BaseRemoteMethodInvoker { +class SyncClientMethodInvoker extends AbstractClientMethodInvoker { - public SyncClientMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) { + SyncClientMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) { super(serverHost, localMethod, nettyRemotingClient); } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java index 35405c5578..1fbbd9ed6c 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java @@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.extract.base.future; import org.apache.dolphinscheduler.extract.base.IRpcResponse; -import java.util.Iterator; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -34,17 +32,13 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class ResponseFuture { - private static final ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256); + private static final ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(); private final long opaque; // remove the timeout private final long timeoutMillis; - private final InvokeCallback invokeCallback; - - private final ReleaseSemaphore releaseSemaphore; - private final CountDownLatch latch = new CountDownLatch(1); private final long beginTimestamp = System.currentTimeMillis(); @@ -57,14 +51,9 @@ public class ResponseFuture { private Throwable cause; - public ResponseFuture(long opaque, - long timeoutMillis, - InvokeCallback invokeCallback, - ReleaseSemaphore releaseSemaphore) { + public ResponseFuture(long opaque, long timeoutMillis) { this.opaque = opaque; this.timeoutMillis = timeoutMillis; - this.invokeCallback = invokeCallback; - this.releaseSemaphore = releaseSemaphore; FUTURE_TABLE.put(opaque, this); } @@ -90,10 +79,6 @@ public class ResponseFuture { return FUTURE_TABLE.get(opaque); } - public void removeFuture() { - FUTURE_TABLE.remove(opaque); - } - /** * whether timeout * @@ -104,15 +89,6 @@ public class ResponseFuture { return diff > this.timeoutMillis; } - /** - * execute invoke callback - */ - public void executeInvokeCallback() { - if (invokeCallback != null) { - invokeCallback.operationComplete(this); - } - } - public boolean isSendOK() { return sendOk; } @@ -129,52 +105,4 @@ public class ResponseFuture { return cause; } - public long getOpaque() { - return opaque; - } - - public long getTimeoutMillis() { - return timeoutMillis; - } - - public long getBeginTimestamp() { - return beginTimestamp; - } - - public InvokeCallback getInvokeCallback() { - return invokeCallback; - } - - /** - * release - */ - public void release() { - if (this.releaseSemaphore != null) { - this.releaseSemaphore.release(); - } - } - - /** - * scan future table - */ - public static void scanFutureTable() { - Iterator> it = FUTURE_TABLE.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry next = it.next(); - ResponseFuture future = next.getValue(); - if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 1000) > System.currentTimeMillis()) { - continue; - } - try { - // todo: use thread pool to execute the async callback, otherwise will block the scan thread - future.release(); - future.executeInvokeCallback(); - } catch (Exception ex) { - log.error("ScanFutureTable, execute callback error, requestId: {}", future.getOpaque(), ex); - } - it.remove(); - log.debug("Remove timeout request: {}", future); - } - } - } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java index b4978172f1..f57ff0b609 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.extract.base.server; import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; import org.apache.dolphinscheduler.extract.base.StandardRpcRequest; import org.apache.dolphinscheduler.extract.base.StandardRpcResponse; import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter; @@ -30,6 +29,7 @@ import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import lombok.extern.slf4j.Slf4j; @@ -42,14 +42,14 @@ import io.netty.handler.timeout.IdleStateEvent; @Slf4j @ChannelHandler.Sharable -public class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter { +class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter { - private final NettyRemotingServer nettyRemotingServer; + private final ExecutorService methodInvokeExecutor; private final Map methodInvokerMap; - public JdkDynamicServerHandler(NettyRemotingServer nettyRemotingServer) { - this.nettyRemotingServer = nettyRemotingServer; + JdkDynamicServerHandler(ExecutorService methodInvokeExecutor) { + this.methodInvokeExecutor = methodInvokeExecutor; this.methodInvokerMap = new ConcurrentHashMap<>(); } @@ -90,7 +90,7 @@ public class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter { channel.writeAndFlush(response); return; } - nettyRemotingServer.getDefaultExecutor().execute(() -> { + methodInvokeExecutor.execute(() -> { StandardRpcResponse iRpcResponse; try { StandardRpcRequest standardRpcRequest = diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java similarity index 75% rename from dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java rename to dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java index 365a17dd03..9beeaced3d 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java @@ -15,15 +15,13 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.base; +package org.apache.dolphinscheduler.extract.base.server; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.exception.RemoteException; import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder; import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder; -import org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler; -import org.apache.dolphinscheduler.extract.base.server.ServerMethodInvoker; import org.apache.dolphinscheduler.extract.base.utils.Constants; import org.apache.dolphinscheduler.extract.base.utils.NettyUtils; @@ -32,6 +30,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; @@ -48,12 +47,15 @@ import io.netty.handler.timeout.IdleStateHandler; * remoting netty server */ @Slf4j -public class NettyRemotingServer { +class NettyRemotingServer { private final ServerBootstrap serverBootstrap = new ServerBootstrap(); - private final ExecutorService defaultExecutor = ThreadUtils - .newDaemonFixedThreadExecutor("NettyRemotingServerThread", Runtime.getRuntime().availableProcessors() * 2); + @Getter + private final String serverName; + + @Getter + private final ExecutorService methodInvokerExecutor; private final EventLoopGroup bossGroup; @@ -61,16 +63,20 @@ public class NettyRemotingServer { private final NettyServerConfig serverConfig; - private final JdkDynamicServerHandler serverHandler = new JdkDynamicServerHandler(this); + private final JdkDynamicServerHandler channelHandler; private final AtomicBoolean isStarted = new AtomicBoolean(false); - public NettyRemotingServer(final NettyServerConfig serverConfig) { + NettyRemotingServer(final NettyServerConfig serverConfig) { this.serverConfig = serverConfig; + this.serverName = serverConfig.getServerName(); + this.methodInvokerExecutor = ThreadUtils.newDaemonFixedThreadExecutor( + serverName + "MethodInvoker-%d", Runtime.getRuntime().availableProcessors() * 2 + 1); + this.channelHandler = new JdkDynamicServerHandler(methodInvokerExecutor); ThreadFactory bossThreadFactory = - ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "BossThread_%s"); + ThreadUtils.newDaemonThreadFactory(serverName + "BossThread-%d"); ThreadFactory workerThreadFactory = - ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "WorkerThread_%s"); + ThreadUtils.newDaemonThreadFactory(serverName + "WorkerThread-%d"); if (Epoll.isAvailable()) { this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory); this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory); @@ -80,7 +86,7 @@ public class NettyRemotingServer { } } - public void start() { + void start() { if (isStarted.compareAndSet(false, true)) { this.serverBootstrap .group(this.bossGroup, this.workGroup) @@ -103,9 +109,9 @@ public class NettyRemotingServer { try { future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); } catch (Exception e) { - log.error("{} bind fail {}, exit", serverConfig.getServerName(), e.getMessage(), e); throw new RemoteException( - String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort())); + String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()), + e); } if (future.isSuccess()) { @@ -113,14 +119,9 @@ public class NettyRemotingServer { return; } - if (future.cause() != null) { - throw new RemoteException( - String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()), - future.cause()); - } else { - throw new RemoteException( - String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort())); - } + throw new RemoteException( + String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()), + future.cause()); } } @@ -135,18 +136,14 @@ public class NettyRemotingServer { .addLast("decoder", new TransporterDecoder()) .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) - .addLast("handler", serverHandler); - } - - public ExecutorService getDefaultExecutor() { - return defaultExecutor; + .addLast("handler", channelHandler); } - public void registerMethodInvoker(ServerMethodInvoker methodInvoker) { - serverHandler.registerMethodInvoker(methodInvoker); + void registerMethodInvoker(ServerMethodInvoker methodInvoker) { + channelHandler.registerMethodInvoker(methodInvoker); } - public void close() { + void close() { if (isStarted.compareAndSet(true, false)) { try { if (bossGroup != null) { @@ -155,7 +152,7 @@ public class NettyRemotingServer { if (workGroup != null) { this.workGroup.shutdownGracefully(); } - defaultExecutor.shutdown(); + methodInvokerExecutor.shutdown(); } catch (Exception ex) { log.error("netty server close exception", ex); } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java similarity index 84% rename from dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java rename to dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java index 6bf1b8d31c..70ed0529e8 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java @@ -15,16 +15,16 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.base; +package org.apache.dolphinscheduler.extract.base.server; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import lombok.experimental.UtilityClass; @UtilityClass -public class NettyRemotingServerFactory { +class NettyRemotingServerFactory { - public NettyRemotingServer buildNettyRemotingServer(NettyServerConfig nettyServerConfig) { + NettyRemotingServer buildNettyRemotingServer(NettyServerConfig nettyServerConfig) { return new NettyRemotingServer(nettyServerConfig); } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java new file mode 100644 index 0000000000..213868ba46 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.base.server; + +import org.apache.dolphinscheduler.extract.base.RpcMethod; +import org.apache.dolphinscheduler.extract.base.RpcService; +import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; + +import java.lang.reflect.Method; + +import lombok.extern.slf4j.Slf4j; + +/** + * The RpcServer based on Netty. The server will register the method invoker and provide the service to the client. + * Once the server is started, it will listen on the port and wait for the client to connect. + *
+ *          RpcServer rpcServer = new RpcServer(new NettyServerConfig());
+ *          rpcServer.registerServerMethodInvokerProvider(new ServerMethodInvokerProviderImpl());
+ *          rpcServer.start();
+ * 
+ */ +@Slf4j +public class RpcServer implements ServerMethodInvokerRegistry, AutoCloseable { + + private final NettyRemotingServer nettyRemotingServer; + + public RpcServer(NettyServerConfig nettyServerConfig) { + this.nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(nettyServerConfig); + } + + public void start() { + nettyRemotingServer.start(); + } + + @Override + public void registerServerMethodInvokerProvider(Object serverMethodInvokerProviderBean) { + for (Class anInterface : serverMethodInvokerProviderBean.getClass().getInterfaces()) { + if (anInterface.getAnnotation(RpcService.class) == null) { + continue; + } + for (Method method : anInterface.getDeclaredMethods()) { + RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class); + if (rpcMethod == null) { + continue; + } + ServerMethodInvoker serverMethodInvoker = + new ServerMethodInvokerImpl(serverMethodInvokerProviderBean, method); + nettyRemotingServer.registerMethodInvoker(serverMethodInvoker); + log.debug("Register ServerMethodInvoker: {} to bean: {}", + serverMethodInvoker.getMethodIdentify(), serverMethodInvoker.getMethodProviderIdentify()); + } + } + } + + @Override + public void close() { + nettyRemotingServer.close(); + } +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java index ee633217b2..151b54bb97 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java @@ -17,10 +17,12 @@ package org.apache.dolphinscheduler.extract.base.server; -public interface ServerMethodInvoker { +interface ServerMethodInvoker { String getMethodIdentify(); + String getMethodProviderIdentify(); + Object invoke(final Object... arg) throws Throwable; } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java index eea9da5e14..4c29650aa0 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.extract.base.server; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -public class ServerMethodInvokerImpl implements ServerMethodInvoker { +class ServerMethodInvokerImpl implements ServerMethodInvoker { private final Object serviceBean; @@ -28,7 +28,7 @@ public class ServerMethodInvokerImpl implements ServerMethodInvoker { private final String methodIdentify; - public ServerMethodInvokerImpl(Object serviceBean, Method method) { + ServerMethodInvokerImpl(Object serviceBean, Method method) { this.serviceBean = serviceBean; this.method = method; this.methodIdentify = method.toGenericString(); @@ -48,4 +48,9 @@ public class ServerMethodInvokerImpl implements ServerMethodInvoker { public String getMethodIdentify() { return methodIdentify; } + + @Override + public String getMethodProviderIdentify() { + return serviceBean.getClass().getName(); + } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java new file mode 100644 index 0000000000..4e56be2617 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.base.server; + +interface ServerMethodInvokerRegistry { + + /** + * Register service object, which will be used to invoke the {@link ServerMethodInvoker}. + * The serverMethodInvokerProviderObject should implement with interface which contains {@link org.apache.dolphinscheduler.extract.base.RpcService} annotation. + */ + void registerServerMethodInvokerProvider(Object serverMethodInvokerProviderObject); + +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java index 2b87a70080..de4943990c 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java @@ -17,11 +17,7 @@ package org.apache.dolphinscheduler.extract.base.server; -import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; -import org.apache.dolphinscheduler.extract.base.RpcMethod; -import org.apache.dolphinscheduler.extract.base.RpcService; - -import java.lang.reflect.Method; +import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import lombok.extern.slf4j.Slf4j; @@ -29,38 +25,21 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.lang.Nullable; +/** + * The RpcServer which will auto discovery the {@link ServerMethodInvoker} from Spring container. + */ @Slf4j -public class SpringServerMethodInvokerDiscovery implements BeanPostProcessor { +public class SpringServerMethodInvokerDiscovery extends RpcServer implements BeanPostProcessor { - protected final NettyRemotingServer nettyRemotingServer; - - public SpringServerMethodInvokerDiscovery(NettyRemotingServer nettyRemotingServer) { - this.nettyRemotingServer = nettyRemotingServer; + public SpringServerMethodInvokerDiscovery(NettyServerConfig nettyServerConfig) { + super(nettyServerConfig); } @Nullable @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - Class[] interfaces = bean.getClass().getInterfaces(); - for (Class anInterface : interfaces) { - if (anInterface.getAnnotation(RpcService.class) == null) { - continue; - } - registerRpcMethodInvoker(anInterface, bean, beanName); - } + registerServerMethodInvokerProvider(bean); return bean; } - private void registerRpcMethodInvoker(Class anInterface, Object bean, String beanName) { - Method[] declaredMethods = anInterface.getDeclaredMethods(); - for (Method method : declaredMethods) { - RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class); - if (rpcMethod == null) { - continue; - } - ServerMethodInvoker methodInvoker = new ServerMethodInvokerImpl(bean, method); - nettyRemotingServer.registerMethodInvoker(methodInvoker); - log.debug("Register ServerMethodInvoker: {} to bean: {}", methodInvoker.getMethodIdentify(), beanName); - } - } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java index 521cf7c75a..92ed49934c 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.extract.base.client; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcService; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; @@ -37,7 +36,7 @@ import org.junit.jupiter.api.Test; public class SingletonJdkDynamicRpcClientProxyFactoryTest { - private NettyRemotingServer nettyRemotingServer; + private SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery; private String serverAddress; @@ -48,11 +47,10 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest { .serverName("ApiServer") .listenPort(listenPort) .build(); - nettyRemotingServer = new NettyRemotingServer(nettyServerConfig); - nettyRemotingServer.start(); serverAddress = "localhost:" + listenPort; - new SpringServerMethodInvokerDiscovery(nettyRemotingServer) - .postProcessAfterInitialization(new IServiceImpl(), "iServiceImpl"); + springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(nettyServerConfig); + springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new IServiceImpl()); + springServerMethodInvokerDiscovery.start(); } @Test @@ -82,7 +80,7 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest { @AfterEach public void tearDown() { - nettyRemotingServer.close(); + springServerMethodInvokerDiscovery.close(); } @RpcService diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java index 0eaf885d11..ab89b021d6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.rpc; -import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -31,21 +30,8 @@ import org.springframework.stereotype.Component; public class MasterRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable { public MasterRpcServer(MasterConfig masterConfig) { - super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder() - .serverName("MasterRpcServer").listenPort(masterConfig.getListenPort()).build())); - } - - public void start() { - log.info("Starting MasterRPCServer..."); - nettyRemotingServer.start(); - log.info("Started MasterRPCServer..."); - } - - @Override - public void close() { - log.info("Closing MasterRPCServer..."); - nettyRemotingServer.close(); - log.info("Closed MasterRPCServer..."); + super(NettyServerConfig.builder().serverName("MasterRpcServer").listenPort(masterConfig.getListenPort()) + .build()); } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java new file mode 100644 index 0000000000..1e5a77edb3 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.rpc; + +import org.apache.dolphinscheduler.server.master.config.MasterConfig; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class MasterRpcServerTest { + + private final MasterRpcServer masterRpcServer = new MasterRpcServer(new MasterConfig()); + + @Test + void testStart() { + Assertions.assertDoesNotThrow(masterRpcServer::start); + } + + @Test + void testClose() { + Assertions.assertDoesNotThrow(masterRpcServer::close); + } +} diff --git a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java index 1a3e4ab1e2..496983118f 100644 --- a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java +++ b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.microbench.rpc; -import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; @@ -46,18 +45,17 @@ import org.openjdk.jmh.infra.Blackhole; @BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime}) public class RpcBenchMarkTest extends AbstractBaseBenchmark { - private NettyRemotingServer nettyRemotingServer; + private SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery; private IService iService; @Setup public void before() { - nettyRemotingServer = new NettyRemotingServer( - NettyServerConfig.builder().serverName("NettyRemotingServer").listenPort(12345).build()); - nettyRemotingServer.start(); - SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery = - new SpringServerMethodInvokerDiscovery(nettyRemotingServer); + NettyServerConfig nettyServerConfig = + NettyServerConfig.builder().serverName("NettyRemotingServer").listenPort(12345).build(); + springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(nettyServerConfig); springServerMethodInvokerDiscovery.postProcessAfterInitialization(new IServiceImpl(), "iServiceImpl"); + springServerMethodInvokerDiscovery.start(); iService = SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class); } @@ -72,6 +70,6 @@ public class RpcBenchMarkTest extends AbstractBaseBenchmark { @TearDown public void after() { - nettyRemotingServer.close(); + springServerMethodInvokerDiscovery.close(); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java index 7733fbba4f..b9f3855cf9 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.rpc; -import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -33,21 +32,8 @@ import org.springframework.stereotype.Service; public class WorkerRpcServer extends SpringServerMethodInvokerDiscovery implements Closeable { public WorkerRpcServer(WorkerConfig workerConfig) { - super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder() - .serverName("WorkerRpcServer").listenPort(workerConfig.getListenPort()).build())); - } - - public void start() { - log.info("WorkerRpcServer starting..."); - nettyRemotingServer.start(); - log.info("WorkerRpcServer started..."); - } - - @Override - public void close() { - log.info("WorkerRpcServer closing"); - nettyRemotingServer.close(); - log.info("WorkerRpcServer closed"); + super(NettyServerConfig.builder().serverName("WorkerRpcServer").listenPort(workerConfig.getListenPort()) + .build()); } } diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java new file mode 100644 index 0000000000..d27eaeeadf --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.rpc; + +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class WorkerRpcServerTest { + + private final WorkerRpcServer workerRpcServer = new WorkerRpcServer(new WorkerConfig()); + + @Test + void testStart() { + Assertions.assertDoesNotThrow(workerRpcServer::start); + } + + @Test + void testClose() { + Assertions.assertDoesNotThrow(workerRpcServer::close); + } + +}