diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 38f00fb4fd..c1aea90393 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -17,17 +17,6 @@ package org.apache.dolphinscheduler.remote; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; - import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.Command; @@ -41,19 +30,40 @@ import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore; import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.handler.NettyClientHandler; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy; +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NettyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.InetSocketAddress; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; + /** * remoting netty client */ @@ -162,11 +172,10 @@ public class NettyRemotingClient { .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) .handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - new NettyDecoder(), - clientHandler, - encoder); + public void initChannel(SocketChannel ch) { + ch.pipeline() + .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) + .addLast(new NettyDecoder(), clientHandler, encoder); } }); this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index ad5c95bb38..867cf4dc56 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.remote.utils.NettyUtils; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,11 +40,11 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; /** * remoting netty server @@ -183,10 +184,11 @@ public class NettyRemotingServer { * @param ch socket channel */ private void initNettyChannel(SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("encoder", encoder); - pipeline.addLast("decoder", new NettyDecoder()); - pipeline.addLast("handler", serverHandler); + ch.pipeline() + .addLast("encoder", encoder) + .addLast("decoder", new NettyDecoder()) + .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) + .addLast("handler", serverHandler); } /** diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index d1ffc65f57..4f477fb467 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -1 +1,105 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +public enum CommandType { + + /** + * remove task log request, + */ + REMOVE_TAK_LOG_REQUEST, + + /** + * remove task log response + */ + REMOVE_TAK_LOG_RESPONSE, + + /** + * roll view log request + */ + ROLL_VIEW_LOG_REQUEST, + + /** + * roll view log response + */ + ROLL_VIEW_LOG_RESPONSE, + + /** + * view whole log request + */ + VIEW_WHOLE_LOG_REQUEST, + + /** + * view whole log response + */ + VIEW_WHOLE_LOG_RESPONSE, + + /** + * get log bytes request + */ + GET_LOG_BYTES_REQUEST, + + /** + * get log bytes response + */ + GET_LOG_BYTES_RESPONSE, + + + WORKER_REQUEST, + MASTER_RESPONSE, + + /** + * execute task request + */ + TASK_EXECUTE_REQUEST, + + /** + * execute task ack + */ + TASK_EXECUTE_ACK, + + /** + * execute task response + */ + TASK_EXECUTE_RESPONSE, + + /** + * kill task + */ + TASK_KILL_REQUEST, + + /** + * kill task response + */ + TASK_KILL_RESPONSE, + + /** + * HEART_BEAT + */ + HEART_BEAT, + + /** + * ping + */ + PING, + + /** + * pong + */ + PONG; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index 48d78d9ad6..a988acfe17 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.handler; -import io.netty.channel.*; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -25,16 +25,24 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + /** - * netty client request handler + * netty client request handler */ @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @@ -42,12 +50,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); /** - * netty client + * netty client */ private final NettyRemotingClient nettyRemotingClient; + private static byte[] heartBeatData = "heart_beat".getBytes(); + /** - * callback thread executor + * callback thread executor */ private final ExecutorService callbackExecutor; @@ -57,19 +67,19 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final ConcurrentHashMap> processors; /** - * default executor + * default executor */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); - public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){ + public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) { this.nettyRemotingClient = nettyRemotingClient; this.callbackExecutor = callbackExecutor; this.processors = new ConcurrentHashMap(); } /** - * When the current channel is not active, - * the current channel has reached the end of its life cycle + * When the current channel is not active, + * the current channel has reached the end of its life cycle * * @param ctx channel handler context * @throws Exception @@ -81,7 +91,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } /** - * The current channel reads data from the remote + * The current channel reads data from the remote * * @param ctx channel handler context * @param msg message @@ -89,55 +99,55 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - processReceived(ctx.channel(), (Command)msg); + processReceived(ctx.channel(), (Command) msg); } /** * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; - if(executorRef == null){ + if (executorRef == null) { executorRef = defaultExecutor; } this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); } /** - * process received logic + * process received logic * * @param command command */ private void processReceived(final Channel channel, final Command command) { ResponseFuture future = ResponseFuture.getFuture(command.getOpaque()); - if(future != null){ + if (future != null) { future.setResponseCommand(command); future.release(); - if(future.getInvokeCallback() != null){ + if (future.getInvokeCallback() != null) { this.callbackExecutor.submit(new Runnable() { @Override public void run() { future.executeInvokeCallback(); } }); - } else{ + } else { future.putResponse(command); } - } else{ + } else { processByCommandType(channel, command); } } @@ -163,9 +173,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } /** - * caught exception - * @param ctx channel handler context - * @param cause cause + * caught exception + * + * @param ctx channel handler context + * @param cause cause * @throws Exception */ @Override @@ -175,4 +186,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ctx.channel().close(); } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + Command heartBeat = new Command(); + heartBeat.setType(CommandType.HEART_BEAT); + heartBeat.setBody(heartBeatData); + ctx.writeAndFlush(heartBeat) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + + } else { + super.userEventTriggered(ctx, evt); + } + } + } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java index da2a6ff8bf..09e41e9b54 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java @@ -17,22 +17,30 @@ package org.apache.dolphinscheduler.remote.handler; -import io.netty.channel.*; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + + /** - * netty server request handler + * netty server request handler */ @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { @@ -40,22 +48,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); /** - * netty remote server + * netty remote server */ private final NettyRemotingServer nettyRemotingServer; /** - * server processors queue + * server processors queue */ private final ConcurrentHashMap> processors = new ConcurrentHashMap(); - public NettyServerHandler(NettyRemotingServer nettyRemotingServer){ + public NettyServerHandler(NettyRemotingServer nettyRemotingServer) { this.nettyRemotingServer = nettyRemotingServer; } /** - * When the current channel is not active, - * the current channel has reached the end of its life cycle + * When the current channel is not active, + * the current channel has reached the end of its life cycle + * * @param ctx channel handler context * @throws Exception */ @@ -73,38 +82,39 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - processReceived(ctx.channel(), (Command)msg); + processReceived(ctx.channel(), (Command) msg); } /** * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; - if(executorRef == null){ + if (executorRef == null) { executorRef = nettyRemotingServer.getDefaultExecutor(); } this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); } /** - * process received logic + * process received logic + * * @param channel channel - * @param msg message + * @param msg message */ private void processReceived(final Channel channel, final Command msg) { final CommandType commandType = msg.getType(); @@ -132,22 +142,22 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } /** - * caught exception + * caught exception * - * @param ctx channel handler context + * @param ctx channel handler context * @param cause cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error("exceptionCaught : {}",cause.getMessage(), cause); + logger.error("exceptionCaught : {}", cause.getMessage(), cause); ctx.channel().close(); } /** - * channel write changed + * channel write changed * - * @param ctx channel handler context + * @param ctx channel handler context * @throws Exception */ @Override @@ -158,16 +168,25 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { if (!ch.isWritable()) { if (logger.isWarnEnabled()) { logger.warn("{} is not writable, over high water level : {}", - ch, config.getWriteBufferHighWaterMark()); + ch, config.getWriteBufferHighWaterMark()); } config.setAutoRead(false); } else { if (logger.isWarnEnabled()) { logger.warn("{} is writable, to low water : {}", - ch, config.getWriteBufferLowWaterMark()); + ch, config.getWriteBufferLowWaterMark()); } config.setAutoRead(true); } } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + ctx.channel().close(); + } else { + super.userEventTriggered(ctx, evt); + } + } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index 91d4ac245e..866ebb6c2b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.remote.utils; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; - /** * constant */ @@ -30,6 +29,10 @@ public class Constants { public static final String SLASH = "/"; + public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 1000; + + public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 60; + /** * charset */