diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 678fe84f90..df0c13ad38 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -50,22 +50,51 @@ public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
+ /**
+ * bootstrap
+ */
private final Bootstrap bootstrap = new Bootstrap();
+ /**
+ * encoder
+ */
private final NettyEncoder encoder = new NettyEncoder();
+ /**
+ * channels
+ */
private final ConcurrentHashMap
channels = new ConcurrentHashMap();
+ /**
+ * default executor
+ */
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
+ /**
+ * started flag
+ */
private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ /**
+ * worker group
+ */
private final NioEventLoopGroup workerGroup;
+ /**
+ * client handler
+ */
private final NettyClientHandler clientHandler = new NettyClientHandler(this);
+ /**
+ * netty client config
+ */
private final NettyClientConfig clientConfig;
+ /**
+ * netty client init
+ *
+ * @param clientConfig client config
+ */
public NettyRemotingClient(final NettyClientConfig clientConfig){
this.clientConfig = clientConfig;
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
@@ -79,6 +108,9 @@ public class NettyRemotingClient {
this.start();
}
+ /**
+ * netty server start
+ */
private void start(){
this.bootstrap
@@ -97,18 +129,36 @@ public class NettyRemotingClient {
encoder);
}
});
- //
isStarted.compareAndSet(false, true);
}
+ /**
+ * register processor
+ *
+ * @param commandType command type
+ * @param processor processor
+ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
registerProcessor(commandType, processor, null);
}
+ /**
+ * register processor
+ *
+ * @param commandType command type
+ * @param processor processor
+ * @param executor thread executor
+ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor);
}
+ /**
+ * send connect
+ * @param address address
+ * @param command command
+ * @throws RemotingException
+ */
public void send(final Address address, final Command command) throws RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
@@ -132,6 +182,11 @@ public class NettyRemotingClient {
}
}
+ /**
+ * get channel
+ * @param address address
+ * @return channel
+ */
public Channel getChannel(Address address) {
Channel channel = channels.get(address);
if(channel != null && channel.isActive()){
@@ -140,6 +195,12 @@ public class NettyRemotingClient {
return createChannel(address, true);
}
+ /**
+ * create channel
+ * @param address address
+ * @param isSync is sync
+ * @return channel
+ */
public Channel createChannel(Address address, boolean isSync) {
ChannelFuture future;
try {
@@ -160,10 +221,17 @@ public class NettyRemotingClient {
return null;
}
+ /**
+ * get default thread executor
+ * @return thread executor
+ */
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}
+ /**
+ * close client
+ */
public void close() {
if(isStarted.compareAndSet(true, false)){
try {
@@ -181,6 +249,9 @@ public class NettyRemotingClient {
}
}
+ /**
+ * close channel
+ */
private void closeChannels(){
for (Channel channel : this.channels.values()) {
channel.close();
@@ -188,6 +259,10 @@ public class NettyRemotingClient {
this.channels.clear();
}
+ /**
+ * remove channel
+ * @param address address
+ */
public void removeChannel(Address address){
Channel channel = this.channels.remove(address);
if(channel != null){
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index 5823dbb088..c69bf09540 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -48,28 +48,58 @@ public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
+ /**
+ * server bootstart
+ */
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
+ /**
+ * encoder
+ */
private final NettyEncoder encoder = new NettyEncoder();
+ /**
+ * default executor
+ */
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
+ /**
+ * boss group
+ */
private final NioEventLoopGroup bossGroup;
+ /**
+ * worker group
+ */
private final NioEventLoopGroup workGroup;
+ /**
+ * server config
+ */
private final NettyServerConfig serverConfig;
+ /**
+ * server handler
+ */
private final NettyServerHandler serverHandler = new NettyServerHandler(this);
+ /**
+ * started flag
+ */
private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ /**
+ * server init
+ *
+ * @param serverConfig server config
+ */
public NettyRemotingServer(final NettyServerConfig serverConfig){
this.serverConfig = serverConfig;
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
+ @Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
@@ -78,12 +108,16 @@ public class NettyRemotingServer {
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
+ @Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
}
+ /**
+ * server start
+ */
public void start(){
if(this.isStarted.get()){
@@ -125,6 +159,11 @@ public class NettyRemotingServer {
isStarted.compareAndSet(false, true);
}
+ /**
+ * init netty channel
+ * @param ch socket channel
+ * @throws Exception
+ */
private void initNettyChannel(NioSocketChannel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", encoder);
@@ -132,14 +171,30 @@ public class NettyRemotingServer {
pipeline.addLast("handler", serverHandler);
}
+ /**
+ * register processor
+ * @param commandType command type
+ * @param processor processor
+ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
+ /**
+ * register processor
+ *
+ * @param commandType command type
+ * @param processor processor
+ * @param executor thread executor
+ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.serverHandler.registerProcessor(commandType, processor, executor);
}
+ /**
+ * get default thread executor
+ * @return thread executor
+ */
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
index 998f4ee7d5..caa4fbdd17 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
@@ -38,6 +38,14 @@ public class NettyDecoder extends ReplayingDecoder {
private final CommandHeader commandHeader = new CommandHeader();
+ /**
+ * decode
+ *
+ * @param ctx channel handler context
+ * @param in byte buffer
+ * @param out out content
+ * @throws Exception
+ */
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List