Browse Source

[FIX_#3789][remote]cherry pick from dev to support netty heart beat

pull/3/MERGE
Kirs 4 years ago committed by lgcareer
parent
commit
f0ff8e2eb0
  1. 190
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
  2. 166
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
  3. 7
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  4. 75
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
  5. 67
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
  6. 5
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
  7. 7
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java

190
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@ -18,10 +18,16 @@
package org.apache.dolphinscheduler.remote; package org.apache.dolphinscheduler.remote;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
@ -38,6 +44,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy; import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -47,7 +55,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
* remoting netty client * remoting netty client
*/ */
public class NettyRemotingClient { public class NettyRemotingClient {
@ -59,7 +67,7 @@ public class NettyRemotingClient {
private final Bootstrap bootstrap = new Bootstrap(); private final Bootstrap bootstrap = new Bootstrap();
/** /**
* encoder * encoder
*/ */
private final NettyEncoder encoder = new NettyEncoder(); private final NettyEncoder encoder = new NettyEncoder();
@ -69,57 +77,69 @@ public class NettyRemotingClient {
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128); private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);
/** /**
* started flag * started flag
*/ */
private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isStarted = new AtomicBoolean(false);
/** /**
* worker group * worker group
*/ */
private final NioEventLoopGroup workerGroup; private final EventLoopGroup workerGroup;
/** /**
* client config * client config
*/ */
private final NettyClientConfig clientConfig; private final NettyClientConfig clientConfig;
/** /**
* saync semaphore * saync semaphore
*/ */
private final Semaphore asyncSemaphore = new Semaphore(200, true); private final Semaphore asyncSemaphore = new Semaphore(200, true);
/** /**
* callback thread executor * callback thread executor
*/ */
private final ExecutorService callbackExecutor; private final ExecutorService callbackExecutor;
/** /**
* client handler * client handler
*/ */
private final NettyClientHandler clientHandler; private final NettyClientHandler clientHandler;
/** /**
* response future executor * response future executor
*/ */
private final ScheduledExecutorService responseFutureExecutor; private final ScheduledExecutorService responseFutureExecutor;
/** /**
* client init * client init
*
* @param clientConfig client config * @param clientConfig client config
*/ */
public NettyRemotingClient(final NettyClientConfig clientConfig){ public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { if (NettyUtils.useEpoll()) {
private AtomicInteger threadIndex = new AtomicInteger(0); this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
} }
}); });
} else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
}
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy()); new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor); this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor")); this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
@ -128,26 +148,27 @@ public class NettyRemotingClient {
} }
/** /**
* start * start
*/ */
private void start(){ private void start() {
this.bootstrap this.bootstrap
.group(this.workerGroup) .group(this.workerGroup)
.channel(NioSocketChannel.class) .channel(NettyUtils.getSocketChannelClass())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.handler(new ChannelInitializer<SocketChannel>() { .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
@Override .handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception { @Override
ch.pipeline().addLast( public void initChannel(SocketChannel ch) throws Exception {
new NettyDecoder(), ch.pipeline().addLast(
clientHandler, new NettyDecoder(),
encoder); clientHandler,
} encoder);
}); }
});
this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() { this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -159,10 +180,11 @@ public class NettyRemotingClient {
} }
/** /**
* async send * async send
* @param host host *
* @param command command * @param host host
* @param timeoutMillis timeoutMillis * @param command command
* @param timeoutMillis timeoutMillis
* @param invokeCallback callback function * @param invokeCallback callback function
* @throws InterruptedException * @throws InterruptedException
* @throws RemotingException * @throws RemotingException
@ -182,22 +204,22 @@ public class NettyRemotingClient {
* control concurrency number * control concurrency number
*/ */
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if(acquired){ if (acquired) {
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore); final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
/** /**
* response future * response future
*/ */
final ResponseFuture responseFuture = new ResponseFuture(opaque, final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis, timeoutMillis,
invokeCallback, invokeCallback,
releaseSemaphore); releaseSemaphore);
try { try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){ channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){ if (future.isSuccess()) {
responseFuture.setSendOk(true); responseFuture.setSendOk(true);
return; return;
} else { } else {
@ -207,28 +229,29 @@ public class NettyRemotingClient {
responseFuture.putResponse(null); responseFuture.putResponse(null);
try { try {
responseFuture.executeInvokeCallback(); responseFuture.executeInvokeCallback();
} catch (Throwable ex){ } catch (Throwable ex) {
logger.error("execute callback error", ex); logger.error("execute callback error", ex);
} finally{ } finally {
responseFuture.release(); responseFuture.release();
} }
} }
}); });
} catch (Throwable ex){ } catch (Throwable ex) {
responseFuture.release(); responseFuture.release();
throw new RemotingException(String.format("send command to host: %s failed", host), ex); throw new RemotingException(String.format("send command to host: %s failed", host), ex);
} }
} else{ } else {
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
throw new RemotingTooMuchRequestException(message); throw new RemotingTooMuchRequestException(message);
} }
} }
/** /**
* sync send * sync send
* @param host host *
* @param command command * @param host host
* @param command command
* @param timeoutMillis timeoutMillis * @param timeoutMillis timeoutMillis
* @return command * @return command
* @throws InterruptedException * @throws InterruptedException
@ -244,7 +267,7 @@ public class NettyRemotingClient {
channel.writeAndFlush(command).addListener(new ChannelFutureListener() { channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){ if (future.isSuccess()) {
responseFuture.setSendOk(true); responseFuture.setSendOk(true);
return; return;
} else { } else {
@ -259,10 +282,10 @@ public class NettyRemotingClient {
* sync wait for result * sync wait for result
*/ */
Command result = responseFuture.waitResponse(); Command result = responseFuture.waitResponse();
if(result == null){ if (result == null) {
if(responseFuture.isSendOK()){ if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause()); throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
} else{ } else {
throw new RemotingException(host.toString(), responseFuture.getCause()); throw new RemotingException(host.toString(), responseFuture.getCause());
} }
} }
@ -270,8 +293,9 @@ public class NettyRemotingClient {
} }
/** /**
* send task * send task
* @param host host *
* @param host host
* @param command command * @param command command
* @throws RemotingException * @throws RemotingException
*/ */
@ -296,33 +320,35 @@ public class NettyRemotingClient {
} }
/** /**
* register processor * register processor
*
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
} }
/** /**
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor); this.clientHandler.registerProcessor(commandType, processor, executor);
} }
/** /**
* get channel * get channel
*
* @param host * @param host
* @return * @return
*/ */
public Channel getChannel(Host host) { public Channel getChannel(Host host) {
Channel channel = channels.get(host); Channel channel = channels.get(host);
if(channel != null && channel.isActive()){ if (channel != null && channel.isActive()) {
return channel; return channel;
} }
return createChannel(host, true); return createChannel(host, true);
@ -330,17 +356,18 @@ public class NettyRemotingClient {
/** /**
* create channel * create channel
* @param host host *
* @param host host
* @param isSync sync flag * @param isSync sync flag
* @return channel * @return channel
*/ */
public Channel createChannel(Host host, boolean isSync) { public Channel createChannel(Host host, boolean isSync) {
ChannelFuture future; ChannelFuture future;
try { try {
synchronized (bootstrap){ synchronized (bootstrap) {
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort())); future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
} }
if(isSync){ if (isSync) {
future.sync(); future.sync();
} }
if (future.isSuccess()) { if (future.isSuccess()) {
@ -358,16 +385,16 @@ public class NettyRemotingClient {
* close * close
*/ */
public void close() { public void close() {
if(isStarted.compareAndSet(true, false)){ if (isStarted.compareAndSet(true, false)) {
try { try {
closeChannels(); closeChannels();
if(workerGroup != null){ if (workerGroup != null) {
this.workerGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully();
} }
if(callbackExecutor != null){ if (callbackExecutor != null) {
this.callbackExecutor.shutdownNow(); this.callbackExecutor.shutdownNow();
} }
if(this.responseFutureExecutor != null){ if (this.responseFutureExecutor != null) {
this.responseFutureExecutor.shutdownNow(); this.responseFutureExecutor.shutdownNow();
} }
} catch (Exception ex) { } catch (Exception ex) {
@ -378,9 +405,9 @@ public class NettyRemotingClient {
} }
/** /**
* close channels * close channels
*/ */
private void closeChannels(){ private void closeChannels() {
for (Channel channel : this.channels.values()) { for (Channel channel : this.channels.values()) {
channel.close(); channel.close();
} }
@ -389,11 +416,12 @@ public class NettyRemotingClient {
/** /**
* close channel * close channel
*
* @param host host * @param host host
*/ */
public void closeChannel(Host host){ public void closeChannel(Host host) {
Channel channel = this.channels.remove(host); Channel channel = this.channels.remove(host);
if(channel != null){ if (channel != null) {
channel.close(); channel.close();
} }
} }

166
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java

@ -17,14 +17,6 @@
package org.apache.dolphinscheduler.remote; package org.apache.dolphinscheduler.remote;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -32,8 +24,7 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.handler.NettyServerHandler; import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.slf4j.Logger; import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -41,45 +32,58 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
/** /**
* remoting netty server * remoting netty server
*/ */
public class NettyRemotingServer { public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class); private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
/** /**
* server bootstrap * server bootstrap
*/ */
private final ServerBootstrap serverBootstrap = new ServerBootstrap(); private final ServerBootstrap serverBootstrap = new ServerBootstrap();
/** /**
* encoder * encoder
*/ */
private final NettyEncoder encoder = new NettyEncoder(); private final NettyEncoder encoder = new NettyEncoder();
/** /**
* default executor * default executor
*/ */
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
/** /**
* boss group * boss group
*/ */
private final NioEventLoopGroup bossGroup; private final EventLoopGroup bossGroup;
/** /**
* worker group * worker group
*/ */
private final NioEventLoopGroup workGroup; private final EventLoopGroup workGroup;
/** /**
* server config * server config
*/ */
private final NettyServerConfig serverConfig; private final NettyServerConfig serverConfig;
/** /**
* server handler * server handler
*/ */
private final NettyServerHandler serverHandler = new NettyServerHandler(this); private final NettyServerHandler serverHandler = new NettyServerHandler(this);
@ -89,82 +93,96 @@ public class NettyRemotingServer {
private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isStarted = new AtomicBoolean(false);
/** /**
* server init * server init
* *
* @param serverConfig server config * @param serverConfig server config
*/ */
public NettyRemotingServer(final NettyServerConfig serverConfig){ public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig; this.serverConfig = serverConfig;
if (NettyUtils.useEpoll()) {
this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { @Override
private AtomicInteger threadIndex = new AtomicInteger(0); public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
});
@Override this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
public Thread newThread(Runnable r) { private AtomicInteger threadIndex = new AtomicInteger(0);
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
});
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { @Override
private AtomicInteger threadIndex = new AtomicInteger(0); public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
} else {
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
} }
}); });
}
/** this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
* server start private AtomicInteger threadIndex = new AtomicInteger(0);
*/
public void start(){
if(this.isStarted.get()){ @Override
return; public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
} }
}
this.serverBootstrap /**
* server start
*/
public void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
.group(this.bossGroup, this.workGroup) .group(this.bossGroup, this.workGroup)
.channel(NioServerSocketChannel.class) .channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<NioSocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(NioSocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
initNettyChannel(ch); initNettyChannel(ch);
} }
}); });
ChannelFuture future; ChannelFuture future;
try { try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) { } catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit",e.getMessage(), e); logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
} }
if (future.isSuccess()) { if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) { } else if (future.cause() != null) {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause());
} else { } else {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
}
} }
//
isStarted.compareAndSet(false, true);
} }
/** /**
* init netty channel * init netty channel
*
* @param ch socket channel * @param ch socket channel
* @throws Exception
*/ */
private void initNettyChannel(NioSocketChannel ch) throws Exception{ private void initNettyChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", encoder); pipeline.addLast("encoder", encoder);
pipeline.addLast("decoder", new NettyDecoder()); pipeline.addLast("decoder", new NettyDecoder());
@ -172,27 +190,29 @@ public class NettyRemotingServer {
} }
/** /**
* register processor * register processor
*
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
} }
/** /**
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.serverHandler.registerProcessor(commandType, processor, executor); this.serverHandler.registerProcessor(commandType, processor, executor);
} }
/** /**
* get default thread executor * get default thread executor
*
* @return thread executor * @return thread executor
*/ */
public ExecutorService getDefaultExecutor() { public ExecutorService getDefaultExecutor() {
@ -200,12 +220,12 @@ public class NettyRemotingServer {
} }
public void close() { public void close() {
if(isStarted.compareAndSet(true, false)){ if (isStarted.compareAndSet(true, false)) {
try { try {
if(bossGroup != null){ if (bossGroup != null) {
this.bossGroup.shutdownGracefully(); this.bossGroup.shutdownGracefully();
} }
if(workGroup != null){ if (workGroup != null) {
this.workGroup.shutdownGracefully(); this.workGroup.shutdownGracefully();
} }
if(defaultExecutor != null){ if(defaultExecutor != null){

7
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -99,7 +99,12 @@ public enum CommandType {
TASK_KILL_RESPONSE, TASK_KILL_RESPONSE,
/** /**
* ping * HEART_BEAT
*/
HEART_BEAT,
/**
* ping
*/ */
PING, PING,

75
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java

@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.handler; package org.apache.dolphinscheduler.remote.handler;
import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -25,16 +25,24 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
/** /**
* netty client request handler * netty client request handler
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter { public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@ -42,12 +50,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
/** /**
* netty client * netty client
*/ */
private final NettyRemotingClient nettyRemotingClient; private final NettyRemotingClient nettyRemotingClient;
private static byte[] heartBeatData = "heart_beat".getBytes();
/** /**
* callback thread executor * callback thread executor
*/ */
private final ExecutorService callbackExecutor; private final ExecutorService callbackExecutor;
@ -57,19 +67,19 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors; private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors;
/** /**
* default executor * default executor
*/ */
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){ public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
this.nettyRemotingClient = nettyRemotingClient; this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor; this.callbackExecutor = callbackExecutor;
this.processors = new ConcurrentHashMap(); this.processors = new ConcurrentHashMap();
} }
/** /**
* When the current channel is not active, * When the current channel is not active,
* the current channel has reached the end of its life cycle * the current channel has reached the end of its life cycle
* *
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception * @throws Exception
@ -81,7 +91,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
} }
/** /**
* The current channel reads data from the remote * The current channel reads data from the remote
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param msg message * @param msg message
@ -89,55 +99,55 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/ */
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived(ctx.channel(), (Command)msg); processReceived(ctx.channel(), (Command) msg);
} }
/** /**
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
} }
/** /**
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor; ExecutorService executorRef = executor;
if(executorRef == null){ if (executorRef == null) {
executorRef = defaultExecutor; executorRef = defaultExecutor;
} }
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
} }
/** /**
* process received logic * process received logic
* *
* @param command command * @param command command
*/ */
private void processReceived(final Channel channel, final Command command) { private void processReceived(final Channel channel, final Command command) {
ResponseFuture future = ResponseFuture.getFuture(command.getOpaque()); ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
if(future != null){ if (future != null) {
future.setResponseCommand(command); future.setResponseCommand(command);
future.release(); future.release();
if(future.getInvokeCallback() != null){ if (future.getInvokeCallback() != null) {
this.callbackExecutor.submit(new Runnable() { this.callbackExecutor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
future.executeInvokeCallback(); future.executeInvokeCallback();
} }
}); });
} else{ } else {
future.putResponse(command); future.putResponse(command);
} }
} else{ } else {
processByCommandType(channel, command); processByCommandType(channel, command);
} }
} }
@ -163,9 +173,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
} }
/** /**
* caught exception * caught exception
* @param ctx channel handler context *
* @param cause cause * @param ctx channel handler context
* @param cause cause
* @throws Exception * @throws Exception
*/ */
@Override @Override
@ -175,4 +186,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
ctx.channel().close(); ctx.channel().close();
} }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
Command heartBeat = new Command();
heartBeat.setType(CommandType.HEART_BEAT);
heartBeat.setBody(heartBeatData);
ctx.writeAndFlush(heartBeat)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
super.userEventTriggered(ctx, evt);
}
}
} }

67
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java

@ -17,22 +17,30 @@
package org.apache.dolphinscheduler.remote.handler; package org.apache.dolphinscheduler.remote.handler;
import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
/** /**
* netty server request handler * netty server request handler
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter { public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@ -40,22 +48,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
/** /**
* netty remote server * netty remote server
*/ */
private final NettyRemotingServer nettyRemotingServer; private final NettyRemotingServer nettyRemotingServer;
/** /**
* server processors queue * server processors queue
*/ */
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap(); private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
public NettyServerHandler(NettyRemotingServer nettyRemotingServer){ public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
this.nettyRemotingServer = nettyRemotingServer; this.nettyRemotingServer = nettyRemotingServer;
} }
/** /**
* When the current channel is not active, * When the current channel is not active,
* the current channel has reached the end of its life cycle * the current channel has reached the end of its life cycle
*
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception * @throws Exception
*/ */
@ -73,38 +82,39 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
*/ */
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived(ctx.channel(), (Command)msg); processReceived(ctx.channel(), (Command) msg);
} }
/** /**
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
} }
/** /**
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor; ExecutorService executorRef = executor;
if(executorRef == null){ if (executorRef == null) {
executorRef = nettyRemotingServer.getDefaultExecutor(); executorRef = nettyRemotingServer.getDefaultExecutor();
} }
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
} }
/** /**
* process received logic * process received logic
*
* @param channel channel * @param channel channel
* @param msg message * @param msg message
*/ */
private void processReceived(final Channel channel, final Command msg) { private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType(); final CommandType commandType = msg.getType();
@ -132,22 +142,22 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
} }
/** /**
* caught exception * caught exception
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param cause cause * @param cause cause
* @throws Exception * @throws Exception
*/ */
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}",cause.getMessage(), cause); logger.error("exceptionCaught : {}", cause.getMessage(), cause);
ctx.channel().close(); ctx.channel().close();
} }
/** /**
* channel write changed * channel write changed
* *
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception * @throws Exception
*/ */
@Override @Override
@ -158,16 +168,25 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (!ch.isWritable()) { if (!ch.isWritable()) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, over high water level : {}", logger.warn("{} is not writable, over high water level : {}",
ch, config.getWriteBufferHighWaterMark()); ch, config.getWriteBufferHighWaterMark());
} }
config.setAutoRead(false); config.setAutoRead(false);
} else { } else {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("{} is writable, to low water : {}", logger.warn("{} is writable, to low water : {}",
ch, config.getWriteBufferLowWaterMark()); ch, config.getWriteBufferLowWaterMark());
} }
config.setAutoRead(true); config.setAutoRead(true);
} }
} }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.channel().close();
} else {
super.userEventTriggered(ctx, evt);
}
}
} }

5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.remote.utils;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
/** /**
* constant * constant
*/ */
@ -29,6 +28,10 @@ public class Constants {
public static final String SLASH = "/"; public static final String SLASH = "/";
public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 1000;
public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 60;
/** /**
* charset * charset
*/ */

7
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java

@ -0,0 +1,7 @@
package org.apache.dolphinscheduler.remote.utils;
/**
* Created by Administrator on 2020/10/14.
*/
public class NettyUtils {
}
Loading…
Cancel
Save