From f0ff8e2eb01c618087139c3e0061ec363f043b85 Mon Sep 17 00:00:00 2001 From: Kirs Date: Mon, 12 Oct 2020 23:29:38 +0800 Subject: [PATCH 1/5] [FIX_#3789][remote]cherry pick from dev to support netty heart beat --- .../remote/NettyRemotingClient.java | 190 ++++++++++-------- .../remote/NettyRemotingServer.java | 166 ++++++++------- .../remote/command/CommandType.java | 7 +- .../remote/handler/NettyClientHandler.java | 75 ++++--- .../remote/handler/NettyServerHandler.java | 67 +++--- .../remote/utils/Constants.java | 5 +- .../remote/utils/NettyUtils.java | 7 + 7 files changed, 312 insertions(+), 205 deletions(-) create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 10e62d8d9b..38f00fb4fd 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -18,10 +18,16 @@ package org.apache.dolphinscheduler.remote; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; + import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.Command; @@ -38,6 +44,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.remote.utils.NettyUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +55,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** - * remoting netty client + * remoting netty client */ public class NettyRemotingClient { @@ -59,7 +67,7 @@ public class NettyRemotingClient { private final Bootstrap bootstrap = new Bootstrap(); /** - * encoder + * encoder */ private final NettyEncoder encoder = new NettyEncoder(); @@ -69,57 +77,69 @@ public class NettyRemotingClient { private final ConcurrentHashMap channels = new ConcurrentHashMap(128); /** - * started flag + * started flag */ private final AtomicBoolean isStarted = new AtomicBoolean(false); /** - * worker group + * worker group */ - private final NioEventLoopGroup workerGroup; + private final EventLoopGroup workerGroup; /** - * client config + * client config */ private final NettyClientConfig clientConfig; /** - * saync semaphore + * saync semaphore */ private final Semaphore asyncSemaphore = new Semaphore(200, true); /** - * callback thread executor + * callback thread executor */ private final ExecutorService callbackExecutor; /** - * client handler + * client handler */ private final NettyClientHandler clientHandler; /** - * response future executor + * response future executor */ private final ScheduledExecutorService responseFutureExecutor; /** - * client init + * client init + * * @param clientConfig client config */ - public NettyRemotingClient(final NettyClientConfig clientConfig){ + public NettyRemotingClient(final NettyClientConfig clientConfig) { this.clientConfig = clientConfig; - this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + if (NettyUtils.useEpoll()) { + this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); - } - }); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); + } + }); + } else { + this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); + } + }); + } this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), - new CallerThreadExecutePolicy()); + new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), + new CallerThreadExecutePolicy()); this.clientHandler = new NettyClientHandler(this, callbackExecutor); this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor")); @@ -128,26 +148,27 @@ public class NettyRemotingClient { } /** - * start + * start */ - private void start(){ + private void start() { this.bootstrap - .group(this.workerGroup) - .channel(NioSocketChannel.class) - .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) - .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) - .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) - .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - new NettyDecoder(), - clientHandler, - encoder); - } - }); + .group(this.workerGroup) + .channel(NettyUtils.getSocketChannelClass()) + .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) + .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) + .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) + .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new NettyDecoder(), + clientHandler, + encoder); + } + }); this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -159,10 +180,11 @@ public class NettyRemotingClient { } /** - * async send - * @param host host - * @param command command - * @param timeoutMillis timeoutMillis + * async send + * + * @param host host + * @param command command + * @param timeoutMillis timeoutMillis * @param invokeCallback callback function * @throws InterruptedException * @throws RemotingException @@ -182,22 +204,22 @@ public class NettyRemotingClient { * control concurrency number */ boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); - if(acquired){ + if (acquired) { final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore); /** * response future */ final ResponseFuture responseFuture = new ResponseFuture(opaque, - timeoutMillis, - invokeCallback, - releaseSemaphore); + timeoutMillis, + invokeCallback, + releaseSemaphore); try { - channel.writeAndFlush(command).addListener(new ChannelFutureListener(){ + channel.writeAndFlush(command).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if(future.isSuccess()){ + if (future.isSuccess()) { responseFuture.setSendOk(true); return; } else { @@ -207,28 +229,29 @@ public class NettyRemotingClient { responseFuture.putResponse(null); try { responseFuture.executeInvokeCallback(); - } catch (Throwable ex){ + } catch (Throwable ex) { logger.error("execute callback error", ex); - } finally{ + } finally { responseFuture.release(); } } }); - } catch (Throwable ex){ + } catch (Throwable ex) { responseFuture.release(); throw new RemotingException(String.format("send command to host: %s failed", host), ex); } - } else{ + } else { String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", - timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); + timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); throw new RemotingTooMuchRequestException(message); } } /** * sync send - * @param host host - * @param command command + * + * @param host host + * @param command command * @param timeoutMillis timeoutMillis * @return command * @throws InterruptedException @@ -244,7 +267,7 @@ public class NettyRemotingClient { channel.writeAndFlush(command).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if(future.isSuccess()){ + if (future.isSuccess()) { responseFuture.setSendOk(true); return; } else { @@ -259,10 +282,10 @@ public class NettyRemotingClient { * sync wait for result */ Command result = responseFuture.waitResponse(); - if(result == null){ - if(responseFuture.isSendOK()){ + if (result == null) { + if (responseFuture.isSendOK()) { throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause()); - } else{ + } else { throw new RemotingException(host.toString(), responseFuture.getCause()); } } @@ -270,8 +293,9 @@ public class NettyRemotingClient { } /** - * send task - * @param host host + * send task + * + * @param host host * @param command command * @throws RemotingException */ @@ -296,33 +320,35 @@ public class NettyRemotingClient { } /** - * register processor + * register processor + * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { this.clientHandler.registerProcessor(commandType, processor, executor); } /** - * get channel + * get channel + * * @param host * @return */ public Channel getChannel(Host host) { Channel channel = channels.get(host); - if(channel != null && channel.isActive()){ + if (channel != null && channel.isActive()) { return channel; } return createChannel(host, true); @@ -330,17 +356,18 @@ public class NettyRemotingClient { /** * create channel - * @param host host + * + * @param host host * @param isSync sync flag * @return channel */ public Channel createChannel(Host host, boolean isSync) { ChannelFuture future; try { - synchronized (bootstrap){ + synchronized (bootstrap) { future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort())); } - if(isSync){ + if (isSync) { future.sync(); } if (future.isSuccess()) { @@ -358,16 +385,16 @@ public class NettyRemotingClient { * close */ public void close() { - if(isStarted.compareAndSet(true, false)){ + if (isStarted.compareAndSet(true, false)) { try { closeChannels(); - if(workerGroup != null){ + if (workerGroup != null) { this.workerGroup.shutdownGracefully(); } - if(callbackExecutor != null){ + if (callbackExecutor != null) { this.callbackExecutor.shutdownNow(); } - if(this.responseFutureExecutor != null){ + if (this.responseFutureExecutor != null) { this.responseFutureExecutor.shutdownNow(); } } catch (Exception ex) { @@ -378,9 +405,9 @@ public class NettyRemotingClient { } /** - * close channels + * close channels */ - private void closeChannels(){ + private void closeChannels() { for (Channel channel : this.channels.values()) { channel.close(); } @@ -389,11 +416,12 @@ public class NettyRemotingClient { /** * close channel + * * @param host host */ - public void closeChannel(Host host){ + public void closeChannel(Host host) { Channel channel = this.channels.remove(host); - if(channel != null){ + if (channel != null) { channel.close(); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index dbeb318f2d..c3bdc97551 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -17,14 +17,6 @@ package org.apache.dolphinscheduler.remote; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -32,8 +24,7 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.handler.NettyServerHandler; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Constants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.dolphinscheduler.remote.utils.NettyUtils; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,45 +32,58 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; + /** - * remoting netty server + * remoting netty server */ public class NettyRemotingServer { private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class); /** - * server bootstrap + * server bootstrap */ private final ServerBootstrap serverBootstrap = new ServerBootstrap(); /** - * encoder + * encoder */ private final NettyEncoder encoder = new NettyEncoder(); /** - * default executor + * default executor */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); /** * boss group */ - private final NioEventLoopGroup bossGroup; + private final EventLoopGroup bossGroup; /** - * worker group + * worker group */ - private final NioEventLoopGroup workGroup; + private final EventLoopGroup workGroup; /** - * server config + * server config */ private final NettyServerConfig serverConfig; /** - * server handler + * server handler */ private final NettyServerHandler serverHandler = new NettyServerHandler(this); @@ -89,82 +93,96 @@ public class NettyRemotingServer { private final AtomicBoolean isStarted = new AtomicBoolean(false); /** - * server init + * server init * * @param serverConfig server config */ - public NettyRemotingServer(final NettyServerConfig serverConfig){ + public NettyRemotingServer(final NettyServerConfig serverConfig) { this.serverConfig = serverConfig; + if (NettyUtils.useEpoll()) { + this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); + } + }); - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); - } - }); + this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); + } + }); + } else { + this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); - } - }); - } + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); + } + }); - /** - * server start - */ - public void start(){ + this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - if(this.isStarted.get()){ - return; + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); + } + }); } + } - this.serverBootstrap + /** + * server start + */ + public void start() { + if (isStarted.compareAndSet(false, true)) { + this.serverBootstrap .group(this.bossGroup, this.workGroup) - .channel(NioServerSocketChannel.class) + .channel(NettyUtils.getServerSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) - .childHandler(new ChannelInitializer() { + .childHandler(new ChannelInitializer() { @Override - protected void initChannel(NioSocketChannel ch) throws Exception { + protected void initChannel(SocketChannel ch) throws Exception { initNettyChannel(ch); } }); - ChannelFuture future; - try { - future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); - } catch (Exception e) { - logger.error("NettyRemotingServer bind fail {}, exit",e.getMessage(), e); - throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); - } - if (future.isSuccess()) { - logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); - } else if (future.cause() != null) { - throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); - } else { - throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + ChannelFuture future; + try { + future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); + } catch (Exception e) { + logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); + throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + } + if (future.isSuccess()) { + logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); + } else if (future.cause() != null) { + throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); + } else { + throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + } } - // - isStarted.compareAndSet(false, true); } /** - * init netty channel + * init netty channel + * * @param ch socket channel - * @throws Exception */ - private void initNettyChannel(NioSocketChannel ch) throws Exception{ + private void initNettyChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encoder", encoder); pipeline.addLast("decoder", new NettyDecoder()); @@ -172,27 +190,29 @@ public class NettyRemotingServer { } /** - * register processor + * register processor + * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { this.serverHandler.registerProcessor(commandType, processor, executor); } /** - * get default thread executor + * get default thread executor + * * @return thread executor */ public ExecutorService getDefaultExecutor() { @@ -200,12 +220,12 @@ public class NettyRemotingServer { } public void close() { - if(isStarted.compareAndSet(true, false)){ + if (isStarted.compareAndSet(true, false)) { try { - if(bossGroup != null){ + if (bossGroup != null) { this.bossGroup.shutdownGracefully(); } - if(workGroup != null){ + if (workGroup != null) { this.workGroup.shutdownGracefully(); } if(defaultExecutor != null){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 17c1e44fd3..77c6064e5b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -99,7 +99,12 @@ public enum CommandType { TASK_KILL_RESPONSE, /** - * ping + * HEART_BEAT + */ + HEART_BEAT, + + /** + * ping */ PING, diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index 48d78d9ad6..a988acfe17 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.handler; -import io.netty.channel.*; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -25,16 +25,24 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + /** - * netty client request handler + * netty client request handler */ @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @@ -42,12 +50,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); /** - * netty client + * netty client */ private final NettyRemotingClient nettyRemotingClient; + private static byte[] heartBeatData = "heart_beat".getBytes(); + /** - * callback thread executor + * callback thread executor */ private final ExecutorService callbackExecutor; @@ -57,19 +67,19 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final ConcurrentHashMap> processors; /** - * default executor + * default executor */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); - public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){ + public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) { this.nettyRemotingClient = nettyRemotingClient; this.callbackExecutor = callbackExecutor; this.processors = new ConcurrentHashMap(); } /** - * When the current channel is not active, - * the current channel has reached the end of its life cycle + * When the current channel is not active, + * the current channel has reached the end of its life cycle * * @param ctx channel handler context * @throws Exception @@ -81,7 +91,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } /** - * The current channel reads data from the remote + * The current channel reads data from the remote * * @param ctx channel handler context * @param msg message @@ -89,55 +99,55 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - processReceived(ctx.channel(), (Command)msg); + processReceived(ctx.channel(), (Command) msg); } /** * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; - if(executorRef == null){ + if (executorRef == null) { executorRef = defaultExecutor; } this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); } /** - * process received logic + * process received logic * * @param command command */ private void processReceived(final Channel channel, final Command command) { ResponseFuture future = ResponseFuture.getFuture(command.getOpaque()); - if(future != null){ + if (future != null) { future.setResponseCommand(command); future.release(); - if(future.getInvokeCallback() != null){ + if (future.getInvokeCallback() != null) { this.callbackExecutor.submit(new Runnable() { @Override public void run() { future.executeInvokeCallback(); } }); - } else{ + } else { future.putResponse(command); } - } else{ + } else { processByCommandType(channel, command); } } @@ -163,9 +173,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } /** - * caught exception - * @param ctx channel handler context - * @param cause cause + * caught exception + * + * @param ctx channel handler context + * @param cause cause * @throws Exception */ @Override @@ -175,4 +186,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ctx.channel().close(); } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + Command heartBeat = new Command(); + heartBeat.setType(CommandType.HEART_BEAT); + heartBeat.setBody(heartBeatData); + ctx.writeAndFlush(heartBeat) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + + } else { + super.userEventTriggered(ctx, evt); + } + } + } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java index da2a6ff8bf..09e41e9b54 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java @@ -17,22 +17,30 @@ package org.apache.dolphinscheduler.remote.handler; -import io.netty.channel.*; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + + /** - * netty server request handler + * netty server request handler */ @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { @@ -40,22 +48,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); /** - * netty remote server + * netty remote server */ private final NettyRemotingServer nettyRemotingServer; /** - * server processors queue + * server processors queue */ private final ConcurrentHashMap> processors = new ConcurrentHashMap(); - public NettyServerHandler(NettyRemotingServer nettyRemotingServer){ + public NettyServerHandler(NettyRemotingServer nettyRemotingServer) { this.nettyRemotingServer = nettyRemotingServer; } /** - * When the current channel is not active, - * the current channel has reached the end of its life cycle + * When the current channel is not active, + * the current channel has reached the end of its life cycle + * * @param ctx channel handler context * @throws Exception */ @@ -73,38 +82,39 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - processReceived(ctx.channel(), (Command)msg); + processReceived(ctx.channel(), (Command) msg); } /** * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; - if(executorRef == null){ + if (executorRef == null) { executorRef = nettyRemotingServer.getDefaultExecutor(); } this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); } /** - * process received logic + * process received logic + * * @param channel channel - * @param msg message + * @param msg message */ private void processReceived(final Channel channel, final Command msg) { final CommandType commandType = msg.getType(); @@ -132,22 +142,22 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } /** - * caught exception + * caught exception * - * @param ctx channel handler context + * @param ctx channel handler context * @param cause cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error("exceptionCaught : {}",cause.getMessage(), cause); + logger.error("exceptionCaught : {}", cause.getMessage(), cause); ctx.channel().close(); } /** - * channel write changed + * channel write changed * - * @param ctx channel handler context + * @param ctx channel handler context * @throws Exception */ @Override @@ -158,16 +168,25 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { if (!ch.isWritable()) { if (logger.isWarnEnabled()) { logger.warn("{} is not writable, over high water level : {}", - ch, config.getWriteBufferHighWaterMark()); + ch, config.getWriteBufferHighWaterMark()); } config.setAutoRead(false); } else { if (logger.isWarnEnabled()) { logger.warn("{} is writable, to low water : {}", - ch, config.getWriteBufferLowWaterMark()); + ch, config.getWriteBufferLowWaterMark()); } config.setAutoRead(true); } } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + ctx.channel().close(); + } else { + super.userEventTriggered(ctx, evt); + } + } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index 48736ca694..ee19da5fcd 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.remote.utils; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; - /** * constant */ @@ -29,6 +28,10 @@ public class Constants { public static final String SLASH = "/"; + public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 1000; + + public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 60; + /** * charset */ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java new file mode 100644 index 0000000000..ad267d0c2b --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.remote.utils; + +/** + * Created by Administrator on 2020/10/14. + */ +public class NettyUtils { +} From b368c53784b1a524d8ad834e6e598cb99de31364 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Wed, 14 Oct 2020 18:52:37 +0800 Subject: [PATCH 2/5] [FIX_#3789][remote]cherry pick from dev to support netty heart beat --- .../remote/config/NettyClientConfig.java | 12 ++++ .../remote/utils/Constants.java | 10 ++++ .../remote/utils/NettyUtils.java | 59 ++++++++++++++++++- 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java index 831e05f7e7..34783369d2 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java @@ -48,6 +48,11 @@ public class NettyClientConfig { */ private int receiveBufferSize = 65535; + /** + * connect timeout millis + */ + private int connectTimeoutMillis = 3000; + public int getWorkerThreads() { return workerThreads; } @@ -88,4 +93,11 @@ public class NettyClientConfig { this.receiveBufferSize = receiveBufferSize; } + public int getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + public void setConnectTimeoutMillis(int connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index ee19da5fcd..43b0789e20 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -45,4 +45,14 @@ public class Constants { public static final String LOCAL_ADDRESS = IPUtils.getFirstNoLoopbackIP4Address(); + /** + * netty epoll enable switch + */ + public static final String NETTY_EPOLL_ENABLE = System.getProperty("netty.epoll.enable", "true"); + + /** + * OS Name + */ + public static final String OS_NAME = System.getProperty("os.name"); + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java index ad267d0c2b..cc62287ebf 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java @@ -1,7 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.utils; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + /** - * Created by Administrator on 2020/10/14. + * NettyUtils */ public class NettyUtils { -} + + private NettyUtils() { + } + + public static boolean useEpoll() { + String osName = Constants.OS_NAME; + if (!osName.toLowerCase().contains("linux")) { + return false; + } + if (!Epoll.isAvailable()) { + return false; + } + String enableNettyEpoll = Constants.NETTY_EPOLL_ENABLE; + return Boolean.parseBoolean(enableNettyEpoll); + } + + public static Class getServerSocketChannelClass() { + if (useEpoll()) { + return EpollServerSocketChannel.class; + } + return NioServerSocketChannel.class; + } + + public static Class getSocketChannelClass() { + if (useEpoll()) { + return EpollSocketChannel.class; + } + return NioSocketChannel.class; + } + +} \ No newline at end of file From adda0eb4d0366593bbcf3a3cd9e21617c2e45502 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Thu, 15 Oct 2020 15:51:16 +0800 Subject: [PATCH 3/5] [fix-3843][api] When update workflow definition,if name already exists, the prompt is not friendly --- .../org/apache/dolphinscheduler/api/enums/Status.java | 2 +- .../api/service/ProcessDefinitionService.java | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 9bb8e65e4b..f5390e99e2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -132,7 +132,7 @@ public enum Status { QUERY_TASK_INSTANCE_LOG_ERROR(10103,"view task instance log error", "查询任务实例日志错误"), DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR(10104,"download task instance log file error", "下载任务日志文件错误"), CREATE_PROCESS_DEFINITION(10105,"create process definition", "创建工作流错误"), - VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR(10106,"verify process definition name unique error", "工作流名称已存在"), + VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR(10106,"verify process definition name unique error", "工作流定义名称已存在"), UPDATE_PROCESS_DEFINITION_ERROR(10107,"update process definition error", "更新工作流定义错误"), RELEASE_PROCESS_DEFINITION_ERROR(10108,"release process definition error", "上线工作流错误"), QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR(10109,"query datail of process definition error", "查询工作流详细信息错误"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 2ae720da3b..a28bbc4b95 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -382,6 +382,14 @@ public class ProcessDefinitionService extends BaseDAGService { return checkProcessJson; } ProcessDefinition processDefine = processService.findProcessDefineById(id); + if (!name.equals(processDefine.getName())) { + ProcessDefinition definition = processDefineMapper.verifyByDefineName(project.getId(), name); + if (definition != null) { + putMsg(result, Status.VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR, name); + return result; + } + } + if (processDefine == null) { // check process definition exists putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id); From ac9f5dc271163d3db50b05e12c627ea8f07ba9ee Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Thu, 15 Oct 2020 16:48:48 +0800 Subject: [PATCH 4/5] [fix-3843][api] When update workflow definition,if name already exists, the prompt is not friendly --- .../api/service/ProcessDefinitionService.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index a28bbc4b95..2312c99f0e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -382,24 +382,25 @@ public class ProcessDefinitionService extends BaseDAGService { return checkProcessJson; } ProcessDefinition processDefine = processService.findProcessDefineById(id); - if (!name.equals(processDefine.getName())) { - ProcessDefinition definition = processDefineMapper.verifyByDefineName(project.getId(), name); - if (definition != null) { - putMsg(result, Status.VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR, name); - return result; - } - } - + // check process definition exists if (processDefine == null) { - // check process definition exists putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id); return result; - } else if (processDefine.getReleaseState() == ReleaseState.ONLINE) { + } + + if (processDefine.getReleaseState() == ReleaseState.ONLINE) { // online can not permit edit putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefine.getName()); return result; - } else { - putMsg(result, Status.SUCCESS); + } + + if (!name.equals(processDefine.getName())) { + // check whether the new process define name exist + ProcessDefinition definition = processDefineMapper.verifyByDefineName(project.getId(), name); + if (definition != null) { + putMsg(result, Status.VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR, name); + return result; + } } Date now = new Date(); From dbc9f149b5790d8421564e16be6a3c9e8bfdfda2 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Thu, 15 Oct 2020 17:52:45 +0800 Subject: [PATCH 5/5] [Fix-#3487][sql] update uc_dolphin_T_t_ds_resources_un --- .../postgresql/dolphinscheduler_ddl.sql | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql index 59b913be99..bb7d9f0d03 100644 --- a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql @@ -16,18 +16,19 @@ */ -- add t_ds_resources_un +delimiter d// CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_resources_un() RETURNS void AS $$ BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.KEY_COLUMN_USAGE - WHERE TABLE_NAME = 't_ds_resources' - AND CONSTRAINT_NAME = 't_ds_resources_un' - ) + IF NOT EXISTS (SELECT 1 FROM information_schema.KEY_COLUMN_USAGE + WHERE TABLE_NAME = 't_ds_resources' + AND CONSTRAINT_NAME = 't_ds_resources_un') THEN -ALTER TABLE t_ds_resources ADD CONSTRAINT t_ds_resources_un UNIQUE (full_name,"type"); -END IF; + ALTER TABLE t_ds_resources ADD CONSTRAINT t_ds_resources_un UNIQUE (full_name,"type"); + END IF; END; $$ LANGUAGE plpgsql; +d// +delimiter ; SELECT uc_dolphin_T_t_ds_resources_un(); DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_resources_un();