Browse Source

Optimizing the scope of RPC base classes (#15946)

* Optimizing the scope of RPC base classes

* Fix UT
3.2.2-release-bak
Wenjun Ruan 7 months ago committed by GitHub
parent
commit
99d8276be7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 16
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
  2. 28
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java
  3. 19
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
  4. 5
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java
  5. 5
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
  6. 2
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java
  7. 2
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java
  8. 5
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
  9. 18
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
  10. 110
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
  11. 2
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java
  12. 1
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
  13. 5
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
  14. 76
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
  15. 12
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
  16. 57
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
  17. 6
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java
  18. 74
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java
  19. 4
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
  20. 9
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
  21. 28
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java
  22. 37
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java
  23. 12
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
  24. 18
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java
  25. 38
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
  26. 14
      dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
  27. 18
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
  28. 39
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java

16
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.alert.rpc; package org.apache.dolphinscheduler.alert.rpc;
import org.apache.dolphinscheduler.alert.config.AlertConfig; import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
@ -31,20 +30,7 @@ import org.springframework.stereotype.Service;
public class AlertRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable { public class AlertRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable {
public AlertRpcServer(AlertConfig alertConfig) { public AlertRpcServer(AlertConfig alertConfig) {
super(NettyRemotingServerFactory.buildNettyRemotingServer( super(NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build());
NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build()));
} }
public void start() {
log.info("Starting AlertRpcServer...");
nettyRemotingServer.start();
log.info("Started AlertRpcServer...");
}
@Override
public void close() {
log.info("Closing AlertRpcServer...");
nettyRemotingServer.close();
log.info("Closed AlertRpcServer...");
}
} }

28
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java → dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java

@ -15,22 +15,24 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.processor; package org.apache.dolphinscheduler.alert.rpc;
import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator; import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.mockito.Mockito; import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** class AlertRpcServerTest {
* dependency config
*/ private final AlertRpcServer alertRpcServer = new AlertRpcServer(new AlertConfig());
@Configuration
public class TaskResponseProcessorTestConfig {
@Bean @Test
public DataQualityResultOperator dataQualityResultOperator() { void testStart() {
return Mockito.mock(DataQualityResultOperator.class); alertRpcServer.start();
} }
@Test
void testClose() {
alertRpcServer.close();
}
} }

19
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.extract.common.ILogService; import org.apache.dolphinscheduler.extract.common.ILogService;
@ -91,7 +90,7 @@ public class LoggerServiceTest {
@Mock @Mock
private TaskDefinitionMapper taskDefinitionMapper; private TaskDefinitionMapper taskDefinitionMapper;
private NettyRemotingServer nettyRemotingServer; private SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery;
private int nettyServerPort = 18080; private int nettyServerPort = 18080;
@ -103,11 +102,10 @@ public class LoggerServiceTest {
return; return;
} }
nettyRemotingServer = new NettyRemotingServer(NettyServerConfig.builder().listenPort(nettyServerPort).build()); springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(
nettyRemotingServer.start(); NettyServerConfig.builder().serverName("TestLogServer").listenPort(nettyServerPort).build());
SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery = springServerMethodInvokerDiscovery.start();
new SpringServerMethodInvokerDiscovery(nettyRemotingServer); springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new ILogService() {
springServerMethodInvokerDiscovery.postProcessAfterInitialization(new ILogService() {
@Override @Override
public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest) { public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest) {
@ -142,13 +140,14 @@ public class LoggerServiceTest {
public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) { public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {
} }
}, "iLogServiceImpl"); });
springServerMethodInvokerDiscovery.start();
} }
@AfterEach @AfterEach
public void tearDown() { public void tearDown() {
if (nettyRemotingServer != null) { if (springServerMethodInvokerDiscovery != null) {
nettyRemotingServer.close(); springServerMethodInvokerDiscovery.close();
} }
} }

5
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java → dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java

@ -17,12 +17,11 @@
package org.apache.dolphinscheduler.extract.base.client; package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.extract.base.utils.Host;
import java.lang.reflect.Method; import java.lang.reflect.Method;
public abstract class BaseRemoteMethodInvoker implements ClientMethodInvoker { abstract class AbstractClientMethodInvoker implements ClientMethodInvoker {
protected final String methodIdentifier; protected final String methodIdentifier;
@ -32,7 +31,7 @@ public abstract class BaseRemoteMethodInvoker implements ClientMethodInvoker {
protected final Host serverHost; protected final Host serverHost;
public BaseRemoteMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) { AbstractClientMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) {
this.serverHost = serverHost; this.serverHost = serverHost;
this.localMethod = localMethod; this.localMethod = localMethod;
this.nettyRemotingClient = nettyRemotingClient; this.nettyRemotingClient = nettyRemotingClient;

5
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.extract.base.client;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.extract.base.utils.Host;
@ -31,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class ClientInvocationHandler implements InvocationHandler { class ClientInvocationHandler implements InvocationHandler {
private final NettyRemotingClient nettyRemotingClient; private final NettyRemotingClient nettyRemotingClient;
@ -39,7 +38,7 @@ public class ClientInvocationHandler implements InvocationHandler {
private final Host serverHost; private final Host serverHost;
public ClientInvocationHandler(Host serverHost, NettyRemotingClient nettyRemotingClient) { ClientInvocationHandler(Host serverHost, NettyRemotingClient nettyRemotingClient) {
this.serverHost = checkNotNull(serverHost); this.serverHost = checkNotNull(serverHost);
this.nettyRemotingClient = checkNotNull(nettyRemotingClient); this.nettyRemotingClient = checkNotNull(nettyRemotingClient);
this.methodInvokerMap = new ConcurrentHashMap<>(); this.methodInvokerMap = new ConcurrentHashMap<>();

2
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.extract.base.client;
import java.lang.reflect.Method; import java.lang.reflect.Method;
public interface ClientMethodInvoker { interface ClientMethodInvoker {
Object invoke(Object proxy, Method method, Object[] args) throws Throwable; Object invoke(Object proxy, Method method, Object[] args) throws Throwable;

2
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.extract.base.client; package org.apache.dolphinscheduler.extract.base.client;
public interface IRpcClientProxyFactory { interface IRpcClientProxyFactory {
/** /**
* Create the client proxy. * Create the client proxy.

5
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.extract.base.client; package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.extract.base.utils.Host;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
@ -34,7 +33,7 @@ import com.google.common.cache.LoadingCache;
/** /**
* This class is used to create a proxy client which will transform local method invocation to remove invocation. * This class is used to create a proxy client which will transform local method invocation to remove invocation.
*/ */
public class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory { class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory {
private final NettyRemotingClient nettyRemotingClient; private final NettyRemotingClient nettyRemotingClient;
@ -49,7 +48,7 @@ public class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory {
} }
}); });
public JdkDynamicRpcClientProxyFactory(NettyRemotingClient nettyRemotingClient) { JdkDynamicRpcClientProxyFactory(NettyRemotingClient nettyRemotingClient) {
this.nettyRemotingClient = nettyRemotingClient; this.nettyRemotingClient = nettyRemotingClient;
} }

18
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java → dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java

@ -15,16 +15,15 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.extract.base; package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.StandardRpcResponse;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture; import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter; import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter; import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.serialize.JsonSerializer; import org.apache.dolphinscheduler.extract.base.serialize.JsonSerializer;
import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils; import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;
import java.util.concurrent.ExecutorService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -38,11 +37,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final NettyRemotingClient nettyRemotingClient; private final NettyRemotingClient nettyRemotingClient;
private final ExecutorService callbackExecutor; public NettyClientHandler(NettyRemotingClient nettyRemotingClient) {
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
this.nettyRemotingClient = nettyRemotingClient; this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor;
} }
@Override @Override
@ -64,13 +60,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
} }
StandardRpcResponse deserialize = JsonSerializer.deserialize(transporter.getBody(), StandardRpcResponse.class); StandardRpcResponse deserialize = JsonSerializer.deserialize(transporter.getBody(), StandardRpcResponse.class);
future.setIRpcResponse(deserialize); future.setIRpcResponse(deserialize);
future.release(); future.putResponse(deserialize);
if (future.getInvokeCallback() != null) {
future.removeFuture();
this.callbackExecutor.execute(future::executeInvokeCallback);
} else {
future.putResponse(deserialize);
}
} }
@Override @Override

110
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java → dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java

@ -15,33 +15,24 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.extract.base; package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig; import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException; import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException; import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTooMuchRequestException;
import org.apache.dolphinscheduler.extract.base.future.InvokeCallback;
import org.apache.dolphinscheduler.extract.base.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture; import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter; import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder; import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder; import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.extract.base.utils.Constants; import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils; import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -71,14 +62,8 @@ public class NettyRemotingClient implements AutoCloseable {
private final NettyClientConfig clientConfig; private final NettyClientConfig clientConfig;
private final Semaphore asyncSemaphore = new Semaphore(1024, true);
private final ExecutorService callbackExecutor;
private final NettyClientHandler clientHandler; private final NettyClientHandler clientHandler;
private final ScheduledExecutorService responseFutureExecutor;
public NettyRemotingClient(final NettyClientConfig clientConfig) { public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-"); ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
@ -87,18 +72,7 @@ public class NettyRemotingClient implements AutoCloseable {
} else { } else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory); this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
} }
this.callbackExecutor = new ThreadPoolExecutor( this.clientHandler = new NettyClientHandler(this);
Constants.CPUS,
Constants.CPUS,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000),
ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-"));
this.start(); this.start();
} }
@ -127,66 +101,9 @@ public class NettyRemotingClient implements AutoCloseable {
.addLast(new TransporterDecoder(), clientHandler, new TransporterEncoder()); .addLast(new TransporterDecoder(), clientHandler, new TransporterEncoder());
} }
}); });
this.responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable, 0, 1, TimeUnit.SECONDS);
isStarted.compareAndSet(false, true); isStarted.compareAndSet(false, true);
} }
public void sendAsync(final Host host,
final Transporter transporter,
final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException("network error");
}
/*
* request unique identification
*/
final long opaque = transporter.getHeader().getOpaque();
/*
* control concurrency number
*/
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
/*
* response future
*/
final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis,
invokeCallback,
releaseSemaphore);
try {
channel.writeAndFlush(transporter).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
try {
responseFuture.executeInvokeCallback();
} catch (Exception ex) {
log.error("execute callback error", ex);
} finally {
responseFuture.release();
}
});
} catch (Exception ex) {
responseFuture.release();
throw new RemotingException(String.format("Send transporter to host: %s failed", host), ex);
}
} else {
String message = String.format(
"try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
throw new RemotingTooMuchRequestException(message);
}
}
public IRpcResponse sendSync(final Host host, final Transporter transporter, public IRpcResponse sendSync(final Host host, final Transporter transporter,
final long timeoutMillis) throws InterruptedException, RemotingException { final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host); final Channel channel = getChannel(host);
@ -194,7 +111,7 @@ public class NettyRemotingClient implements AutoCloseable {
throw new RemotingException(String.format("connect to : %s fail", host)); throw new RemotingException(String.format("connect to : %s fail", host));
} }
final long opaque = transporter.getHeader().getOpaque(); final long opaque = transporter.getHeader().getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);
channel.writeAndFlush(transporter).addListener(future -> { channel.writeAndFlush(transporter).addListener(future -> {
if (future.isSuccess()) { if (future.isSuccess()) {
responseFuture.setSendOk(true); responseFuture.setSendOk(true);
@ -220,7 +137,7 @@ public class NettyRemotingClient implements AutoCloseable {
return iRpcResponse; return iRpcResponse;
} }
public Channel getChannel(Host host) { private Channel getChannel(Host host) {
Channel channel = channels.get(host); Channel channel = channels.get(host);
if (channel != null && channel.isActive()) { if (channel != null && channel.isActive()) {
return channel; return channel;
@ -235,9 +152,9 @@ public class NettyRemotingClient implements AutoCloseable {
* @param isSync sync flag * @param isSync sync flag
* @return channel * @return channel
*/ */
public Channel createChannel(Host host, boolean isSync) { private Channel createChannel(Host host, boolean isSync) {
ChannelFuture future;
try { try {
ChannelFuture future;
synchronized (bootstrap) { synchronized (bootstrap) {
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort())); future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
} }
@ -249,10 +166,11 @@ public class NettyRemotingClient implements AutoCloseable {
channels.put(host, channel); channels.put(host, channel);
return channel; return channel;
} }
} catch (Exception ex) { throw new IllegalArgumentException("connect to host: " + host + " failed");
log.warn(String.format("connect to %s error", host), ex); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Connect to host: " + host + " failed", e);
} }
return null;
} }
@Override @Override
@ -263,12 +181,6 @@ public class NettyRemotingClient implements AutoCloseable {
if (workerGroup != null) { if (workerGroup != null) {
this.workerGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully();
} }
if (callbackExecutor != null) {
this.callbackExecutor.shutdownNow();
}
if (this.responseFutureExecutor != null) {
this.responseFutureExecutor.shutdownNow();
}
log.info("netty client closed"); log.info("netty client closed");
} catch (Exception ex) { } catch (Exception ex) {
log.error("netty client close exception", ex); log.error("netty client close exception", ex);

2
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java → dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.extract.base; package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig; import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;

1
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.extract.base.client; package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig; import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
public class SingletonJdkDynamicRpcClientProxyFactory { public class SingletonJdkDynamicRpcClientProxyFactory {

5
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.extract.base.client; package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.IRpcResponse; import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.StandardRpcRequest; import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
import org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException; import org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
@ -29,9 +28,9 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
import java.lang.reflect.Method; import java.lang.reflect.Method;
public class SyncClientMethodInvoker extends BaseRemoteMethodInvoker { class SyncClientMethodInvoker extends AbstractClientMethodInvoker {
public SyncClientMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) { SyncClientMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) {
super(serverHost, localMethod, nettyRemotingClient); super(serverHost, localMethod, nettyRemotingClient);
} }

76
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java

@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.extract.base.future;
import org.apache.dolphinscheduler.extract.base.IRpcResponse; import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -34,17 +32,13 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class ResponseFuture { public class ResponseFuture {
private static final ConcurrentHashMap<Long, ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256); private static final ConcurrentHashMap<Long, ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>();
private final long opaque; private final long opaque;
// remove the timeout // remove the timeout
private final long timeoutMillis; private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final ReleaseSemaphore releaseSemaphore;
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);
private final long beginTimestamp = System.currentTimeMillis(); private final long beginTimestamp = System.currentTimeMillis();
@ -57,14 +51,9 @@ public class ResponseFuture {
private Throwable cause; private Throwable cause;
public ResponseFuture(long opaque, public ResponseFuture(long opaque, long timeoutMillis) {
long timeoutMillis,
InvokeCallback invokeCallback,
ReleaseSemaphore releaseSemaphore) {
this.opaque = opaque; this.opaque = opaque;
this.timeoutMillis = timeoutMillis; this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
this.releaseSemaphore = releaseSemaphore;
FUTURE_TABLE.put(opaque, this); FUTURE_TABLE.put(opaque, this);
} }
@ -90,10 +79,6 @@ public class ResponseFuture {
return FUTURE_TABLE.get(opaque); return FUTURE_TABLE.get(opaque);
} }
public void removeFuture() {
FUTURE_TABLE.remove(opaque);
}
/** /**
* whether timeout * whether timeout
* *
@ -104,15 +89,6 @@ public class ResponseFuture {
return diff > this.timeoutMillis; return diff > this.timeoutMillis;
} }
/**
* execute invoke callback
*/
public void executeInvokeCallback() {
if (invokeCallback != null) {
invokeCallback.operationComplete(this);
}
}
public boolean isSendOK() { public boolean isSendOK() {
return sendOk; return sendOk;
} }
@ -129,52 +105,4 @@ public class ResponseFuture {
return cause; return cause;
} }
public long getOpaque() {
return opaque;
}
public long getTimeoutMillis() {
return timeoutMillis;
}
public long getBeginTimestamp() {
return beginTimestamp;
}
public InvokeCallback getInvokeCallback() {
return invokeCallback;
}
/**
* release
*/
public void release() {
if (this.releaseSemaphore != null) {
this.releaseSemaphore.release();
}
}
/**
* scan future table
*/
public static void scanFutureTable() {
Iterator<Map.Entry<Long, ResponseFuture>> it = FUTURE_TABLE.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long, ResponseFuture> next = it.next();
ResponseFuture future = next.getValue();
if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 1000) > System.currentTimeMillis()) {
continue;
}
try {
// todo: use thread pool to execute the async callback, otherwise will block the scan thread
future.release();
future.executeInvokeCallback();
} catch (Exception ex) {
log.error("ScanFutureTable, execute callback error, requestId: {}", future.getOpaque(), ex);
}
it.remove();
log.debug("Remove timeout request: {}", future);
}
}
} }

12
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.extract.base.server;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.StandardRpcRequest; import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
import org.apache.dolphinscheduler.extract.base.StandardRpcResponse; import org.apache.dolphinscheduler.extract.base.StandardRpcResponse;
import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter; import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter;
@ -30,6 +29,7 @@ import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -42,14 +42,14 @@ import io.netty.handler.timeout.IdleStateEvent;
@Slf4j @Slf4j
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter { class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter {
private final NettyRemotingServer nettyRemotingServer; private final ExecutorService methodInvokeExecutor;
private final Map<String, ServerMethodInvoker> methodInvokerMap; private final Map<String, ServerMethodInvoker> methodInvokerMap;
public JdkDynamicServerHandler(NettyRemotingServer nettyRemotingServer) { JdkDynamicServerHandler(ExecutorService methodInvokeExecutor) {
this.nettyRemotingServer = nettyRemotingServer; this.methodInvokeExecutor = methodInvokeExecutor;
this.methodInvokerMap = new ConcurrentHashMap<>(); this.methodInvokerMap = new ConcurrentHashMap<>();
} }
@ -90,7 +90,7 @@ public class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter {
channel.writeAndFlush(response); channel.writeAndFlush(response);
return; return;
} }
nettyRemotingServer.getDefaultExecutor().execute(() -> { methodInvokeExecutor.execute(() -> {
StandardRpcResponse iRpcResponse; StandardRpcResponse iRpcResponse;
try { try {
StandardRpcRequest standardRpcRequest = StandardRpcRequest standardRpcRequest =

57
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java → dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java

@ -15,15 +15,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.extract.base; package org.apache.dolphinscheduler.extract.base.server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemoteException; import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder; import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder; import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler;
import org.apache.dolphinscheduler.extract.base.server.ServerMethodInvoker;
import org.apache.dolphinscheduler.extract.base.utils.Constants; import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils; import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
@ -32,6 +30,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -48,12 +47,15 @@ import io.netty.handler.timeout.IdleStateHandler;
* remoting netty server * remoting netty server
*/ */
@Slf4j @Slf4j
public class NettyRemotingServer { class NettyRemotingServer {
private final ServerBootstrap serverBootstrap = new ServerBootstrap(); private final ServerBootstrap serverBootstrap = new ServerBootstrap();
private final ExecutorService defaultExecutor = ThreadUtils @Getter
.newDaemonFixedThreadExecutor("NettyRemotingServerThread", Runtime.getRuntime().availableProcessors() * 2); private final String serverName;
@Getter
private final ExecutorService methodInvokerExecutor;
private final EventLoopGroup bossGroup; private final EventLoopGroup bossGroup;
@ -61,16 +63,20 @@ public class NettyRemotingServer {
private final NettyServerConfig serverConfig; private final NettyServerConfig serverConfig;
private final JdkDynamicServerHandler serverHandler = new JdkDynamicServerHandler(this); private final JdkDynamicServerHandler channelHandler;
private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isStarted = new AtomicBoolean(false);
public NettyRemotingServer(final NettyServerConfig serverConfig) { NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig; this.serverConfig = serverConfig;
this.serverName = serverConfig.getServerName();
this.methodInvokerExecutor = ThreadUtils.newDaemonFixedThreadExecutor(
serverName + "MethodInvoker-%d", Runtime.getRuntime().availableProcessors() * 2 + 1);
this.channelHandler = new JdkDynamicServerHandler(methodInvokerExecutor);
ThreadFactory bossThreadFactory = ThreadFactory bossThreadFactory =
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "BossThread_%s"); ThreadUtils.newDaemonThreadFactory(serverName + "BossThread-%d");
ThreadFactory workerThreadFactory = ThreadFactory workerThreadFactory =
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "WorkerThread_%s"); ThreadUtils.newDaemonThreadFactory(serverName + "WorkerThread-%d");
if (Epoll.isAvailable()) { if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory); this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory); this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
@ -80,7 +86,7 @@ public class NettyRemotingServer {
} }
} }
public void start() { void start() {
if (isStarted.compareAndSet(false, true)) { if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap this.serverBootstrap
.group(this.bossGroup, this.workGroup) .group(this.bossGroup, this.workGroup)
@ -103,9 +109,9 @@ public class NettyRemotingServer {
try { try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) { } catch (Exception e) {
log.error("{} bind fail {}, exit", serverConfig.getServerName(), e.getMessage(), e);
throw new RemoteException( throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort())); String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()),
e);
} }
if (future.isSuccess()) { if (future.isSuccess()) {
@ -113,14 +119,9 @@ public class NettyRemotingServer {
return; return;
} }
if (future.cause() != null) { throw new RemoteException(
throw new RemoteException( String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()),
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()), future.cause());
future.cause());
} else {
throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()));
}
} }
} }
@ -135,18 +136,14 @@ public class NettyRemotingServer {
.addLast("decoder", new TransporterDecoder()) .addLast("decoder", new TransporterDecoder())
.addLast("server-idle-handle", .addLast("server-idle-handle",
new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
.addLast("handler", serverHandler); .addLast("handler", channelHandler);
}
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
} }
public void registerMethodInvoker(ServerMethodInvoker methodInvoker) { void registerMethodInvoker(ServerMethodInvoker methodInvoker) {
serverHandler.registerMethodInvoker(methodInvoker); channelHandler.registerMethodInvoker(methodInvoker);
} }
public void close() { void close() {
if (isStarted.compareAndSet(true, false)) { if (isStarted.compareAndSet(true, false)) {
try { try {
if (bossGroup != null) { if (bossGroup != null) {
@ -155,7 +152,7 @@ public class NettyRemotingServer {
if (workGroup != null) { if (workGroup != null) {
this.workGroup.shutdownGracefully(); this.workGroup.shutdownGracefully();
} }
defaultExecutor.shutdown(); methodInvokerExecutor.shutdown();
} catch (Exception ex) { } catch (Exception ex) {
log.error("netty server close exception", ex); log.error("netty server close exception", ex);
} }

6
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java → dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java

@ -15,16 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.extract.base; package org.apache.dolphinscheduler.extract.base.server;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import lombok.experimental.UtilityClass; import lombok.experimental.UtilityClass;
@UtilityClass @UtilityClass
public class NettyRemotingServerFactory { class NettyRemotingServerFactory {
public NettyRemotingServer buildNettyRemotingServer(NettyServerConfig nettyServerConfig) { NettyRemotingServer buildNettyRemotingServer(NettyServerConfig nettyServerConfig) {
return new NettyRemotingServer(nettyServerConfig); return new NettyRemotingServer(nettyServerConfig);
} }
} }

74
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.base.server;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j;
/**
* The RpcServer based on Netty. The server will register the method invoker and provide the service to the client.
* Once the server is started, it will listen on the port and wait for the client to connect.
* <pre>
* RpcServer rpcServer = new RpcServer(new NettyServerConfig());
* rpcServer.registerServerMethodInvokerProvider(new ServerMethodInvokerProviderImpl());
* rpcServer.start();
* </pre>
*/
@Slf4j
public class RpcServer implements ServerMethodInvokerRegistry, AutoCloseable {
private final NettyRemotingServer nettyRemotingServer;
public RpcServer(NettyServerConfig nettyServerConfig) {
this.nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(nettyServerConfig);
}
public void start() {
nettyRemotingServer.start();
}
@Override
public void registerServerMethodInvokerProvider(Object serverMethodInvokerProviderBean) {
for (Class<?> anInterface : serverMethodInvokerProviderBean.getClass().getInterfaces()) {
if (anInterface.getAnnotation(RpcService.class) == null) {
continue;
}
for (Method method : anInterface.getDeclaredMethods()) {
RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class);
if (rpcMethod == null) {
continue;
}
ServerMethodInvoker serverMethodInvoker =
new ServerMethodInvokerImpl(serverMethodInvokerProviderBean, method);
nettyRemotingServer.registerMethodInvoker(serverMethodInvoker);
log.debug("Register ServerMethodInvoker: {} to bean: {}",
serverMethodInvoker.getMethodIdentify(), serverMethodInvoker.getMethodProviderIdentify());
}
}
}
@Override
public void close() {
nettyRemotingServer.close();
}
}

4
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java

@ -17,10 +17,12 @@
package org.apache.dolphinscheduler.extract.base.server; package org.apache.dolphinscheduler.extract.base.server;
public interface ServerMethodInvoker { interface ServerMethodInvoker {
String getMethodIdentify(); String getMethodIdentify();
String getMethodProviderIdentify();
Object invoke(final Object... arg) throws Throwable; Object invoke(final Object... arg) throws Throwable;
} }

9
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.extract.base.server;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
public class ServerMethodInvokerImpl implements ServerMethodInvoker { class ServerMethodInvokerImpl implements ServerMethodInvoker {
private final Object serviceBean; private final Object serviceBean;
@ -28,7 +28,7 @@ public class ServerMethodInvokerImpl implements ServerMethodInvoker {
private final String methodIdentify; private final String methodIdentify;
public ServerMethodInvokerImpl(Object serviceBean, Method method) { ServerMethodInvokerImpl(Object serviceBean, Method method) {
this.serviceBean = serviceBean; this.serviceBean = serviceBean;
this.method = method; this.method = method;
this.methodIdentify = method.toGenericString(); this.methodIdentify = method.toGenericString();
@ -48,4 +48,9 @@ public class ServerMethodInvokerImpl implements ServerMethodInvoker {
public String getMethodIdentify() { public String getMethodIdentify() {
return methodIdentify; return methodIdentify;
} }
@Override
public String getMethodProviderIdentify() {
return serviceBean.getClass().getName();
}
} }

28
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.base.server;
interface ServerMethodInvokerRegistry {
/**
* Register service object, which will be used to invoke the {@link ServerMethodInvoker}.
* The serverMethodInvokerProviderObject should implement with interface which contains {@link org.apache.dolphinscheduler.extract.base.RpcService} annotation.
*/
void registerServerMethodInvokerProvider(Object serverMethodInvokerProviderObject);
}

37
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java

@ -17,11 +17,7 @@
package org.apache.dolphinscheduler.extract.base.server; package org.apache.dolphinscheduler.extract.base.server;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -29,38 +25,21 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
/**
* The RpcServer which will auto discovery the {@link ServerMethodInvoker} from Spring container.
*/
@Slf4j @Slf4j
public class SpringServerMethodInvokerDiscovery implements BeanPostProcessor { public class SpringServerMethodInvokerDiscovery extends RpcServer implements BeanPostProcessor {
protected final NettyRemotingServer nettyRemotingServer; public SpringServerMethodInvokerDiscovery(NettyServerConfig nettyServerConfig) {
super(nettyServerConfig);
public SpringServerMethodInvokerDiscovery(NettyRemotingServer nettyRemotingServer) {
this.nettyRemotingServer = nettyRemotingServer;
} }
@Nullable @Nullable
@Override @Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?>[] interfaces = bean.getClass().getInterfaces(); registerServerMethodInvokerProvider(bean);
for (Class<?> anInterface : interfaces) {
if (anInterface.getAnnotation(RpcService.class) == null) {
continue;
}
registerRpcMethodInvoker(anInterface, bean, beanName);
}
return bean; return bean;
} }
private void registerRpcMethodInvoker(Class<?> anInterface, Object bean, String beanName) {
Method[] declaredMethods = anInterface.getDeclaredMethods();
for (Method method : declaredMethods) {
RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class);
if (rpcMethod == null) {
continue;
}
ServerMethodInvoker methodInvoker = new ServerMethodInvokerImpl(bean, method);
nettyRemotingServer.registerMethodInvoker(methodInvoker);
log.debug("Register ServerMethodInvoker: {} to bean: {}", methodInvoker.getMethodIdentify(), beanName);
}
}
} }

12
dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.extract.base.client;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService; import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
@ -37,7 +36,7 @@ import org.junit.jupiter.api.Test;
public class SingletonJdkDynamicRpcClientProxyFactoryTest { public class SingletonJdkDynamicRpcClientProxyFactoryTest {
private NettyRemotingServer nettyRemotingServer; private SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery;
private String serverAddress; private String serverAddress;
@ -48,11 +47,10 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
.serverName("ApiServer") .serverName("ApiServer")
.listenPort(listenPort) .listenPort(listenPort)
.build(); .build();
nettyRemotingServer = new NettyRemotingServer(nettyServerConfig);
nettyRemotingServer.start();
serverAddress = "localhost:" + listenPort; serverAddress = "localhost:" + listenPort;
new SpringServerMethodInvokerDiscovery(nettyRemotingServer) springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(nettyServerConfig);
.postProcessAfterInitialization(new IServiceImpl(), "iServiceImpl"); springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new IServiceImpl());
springServerMethodInvokerDiscovery.start();
} }
@Test @Test
@ -82,7 +80,7 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
@AfterEach @AfterEach
public void tearDown() { public void tearDown() {
nettyRemotingServer.close(); springServerMethodInvokerDiscovery.close();
} }
@RpcService @RpcService

18
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.rpc; package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@ -31,21 +30,8 @@ import org.springframework.stereotype.Component;
public class MasterRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable { public class MasterRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable {
public MasterRpcServer(MasterConfig masterConfig) { public MasterRpcServer(MasterConfig masterConfig) {
super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder() super(NettyServerConfig.builder().serverName("MasterRpcServer").listenPort(masterConfig.getListenPort())
.serverName("MasterRpcServer").listenPort(masterConfig.getListenPort()).build())); .build());
}
public void start() {
log.info("Starting MasterRPCServer...");
nettyRemotingServer.start();
log.info("Started MasterRPCServer...");
}
@Override
public void close() {
log.info("Closing MasterRPCServer...");
nettyRemotingServer.close();
log.info("Closed MasterRPCServer...");
} }
} }

38
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class MasterRpcServerTest {
private final MasterRpcServer masterRpcServer = new MasterRpcServer(new MasterConfig());
@Test
void testStart() {
Assertions.assertDoesNotThrow(masterRpcServer::start);
}
@Test
void testClose() {
Assertions.assertDoesNotThrow(masterRpcServer::close);
}
}

14
dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.microbench.rpc; package org.apache.dolphinscheduler.microbench.rpc;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
@ -46,18 +45,17 @@ import org.openjdk.jmh.infra.Blackhole;
@BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime}) @BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime})
public class RpcBenchMarkTest extends AbstractBaseBenchmark { public class RpcBenchMarkTest extends AbstractBaseBenchmark {
private NettyRemotingServer nettyRemotingServer; private SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery;
private IService iService; private IService iService;
@Setup @Setup
public void before() { public void before() {
nettyRemotingServer = new NettyRemotingServer( NettyServerConfig nettyServerConfig =
NettyServerConfig.builder().serverName("NettyRemotingServer").listenPort(12345).build()); NettyServerConfig.builder().serverName("NettyRemotingServer").listenPort(12345).build();
nettyRemotingServer.start(); springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(nettyServerConfig);
SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
new SpringServerMethodInvokerDiscovery(nettyRemotingServer);
springServerMethodInvokerDiscovery.postProcessAfterInitialization(new IServiceImpl(), "iServiceImpl"); springServerMethodInvokerDiscovery.postProcessAfterInitialization(new IServiceImpl(), "iServiceImpl");
springServerMethodInvokerDiscovery.start();
iService = iService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class); SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class);
} }
@ -72,6 +70,6 @@ public class RpcBenchMarkTest extends AbstractBaseBenchmark {
@TearDown @TearDown
public void after() { public void after() {
nettyRemotingServer.close(); springServerMethodInvokerDiscovery.close();
} }
} }

18
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.rpc; package org.apache.dolphinscheduler.server.worker.rpc;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -33,21 +32,8 @@ import org.springframework.stereotype.Service;
public class WorkerRpcServer extends SpringServerMethodInvokerDiscovery implements Closeable { public class WorkerRpcServer extends SpringServerMethodInvokerDiscovery implements Closeable {
public WorkerRpcServer(WorkerConfig workerConfig) { public WorkerRpcServer(WorkerConfig workerConfig) {
super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder() super(NettyServerConfig.builder().serverName("WorkerRpcServer").listenPort(workerConfig.getListenPort())
.serverName("WorkerRpcServer").listenPort(workerConfig.getListenPort()).build())); .build());
}
public void start() {
log.info("WorkerRpcServer starting...");
nettyRemotingServer.start();
log.info("WorkerRpcServer started...");
}
@Override
public void close() {
log.info("WorkerRpcServer closing");
nettyRemotingServer.close();
log.info("WorkerRpcServer closed");
} }
} }

39
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.rpc;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class WorkerRpcServerTest {
private final WorkerRpcServer workerRpcServer = new WorkerRpcServer(new WorkerConfig());
@Test
void testStart() {
Assertions.assertDoesNotThrow(workerRpcServer::start);
}
@Test
void testClose() {
Assertions.assertDoesNotThrow(workerRpcServer::close);
}
}
Loading…
Cancel
Save