From 047b0a312f69a9b884656f53110df791f381f8a8 Mon Sep 17 00:00:00 2001 From: Segun Ogundipe Date: Sat, 30 Jan 2021 14:32:13 +0100 Subject: [PATCH] [Improvement][remote] Refactor remote module to fix code smell (#4576) * chore[remote]: Refactor remote module to fix code smell * chore[remote]: Add licence header * chore[remote]: Clean code smell * chore[remote]: Clean code smell * chore[remote]: Remove unused import --- .../remote/NettyRemotingClient.java | 150 ++++++++---------- .../remote/NettyRemotingServer.java | 64 ++++---- .../remote/codec/NettyEncoder.java | 12 +- .../command/TaskKillResponseCommand.java | 6 +- .../remote/exceptions/RemoteException.java | 42 +++++ .../remote/future/ResponseFuture.java | 63 ++++---- .../remote/handler/NettyClientHandler.java | 34 ++-- .../remote/handler/NettyServerHandler.java | 40 ++--- .../remote/utils/ChannelUtils.java | 31 ++-- .../remote/utils/Constants.java | 4 + .../remote/utils/IPUtils.java | 25 +-- .../dolphinscheduler/remote/utils/Pair.java | 5 +- 12 files changed, 252 insertions(+), 224 deletions(-) create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemoteException.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index c1aea90393..56033730bc 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -55,7 +55,6 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -84,7 +83,7 @@ public class NettyRemotingClient { /** * channels */ - private final ConcurrentHashMap channels = new ConcurrentHashMap(128); + private final ConcurrentHashMap channels = new ConcurrentHashMap<>(128); /** * started flag @@ -130,7 +129,7 @@ public class NettyRemotingClient { this.clientConfig = clientConfig; if (NettyUtils.useEpoll()) { this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + private final AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { @@ -139,7 +138,7 @@ public class NettyRemotingClient { }); } else { this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + private final AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { @@ -148,8 +147,8 @@ public class NettyRemotingClient { }); } this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), - new CallerThreadExecutePolicy()); + new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), + new CallerThreadExecutePolicy()); this.clientHandler = new NettyClientHandler(this, callbackExecutor); this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor")); @@ -163,40 +162,32 @@ public class NettyRemotingClient { private void start() { this.bootstrap - .group(this.workerGroup) - .channel(NettyUtils.getSocketChannelClass()) - .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) - .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) - .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) - .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) { - ch.pipeline() - .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) - .addLast(new NettyDecoder(), clientHandler, encoder); - } - }); - this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - ResponseFuture.scanFutureTable(); - } - }, 5000, 1000, TimeUnit.MILLISECONDS); - // + .group(this.workerGroup) + .channel(NettyUtils.getSocketChannelClass()) + .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) + .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) + .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) + .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ch.pipeline() + .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) + .addLast(new NettyDecoder(), clientHandler, encoder); + } + }); + this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS); isStarted.compareAndSet(false, true); } /** * async send * - * @param host host - * @param command command - * @param timeoutMillis timeoutMillis + * @param host host + * @param command command + * @param timeoutMillis timeoutMillis * @param invokeCallback callback function - * @throws InterruptedException - * @throws RemotingException */ public void sendAsync(final Host host, final Command command, final long timeoutMillis, @@ -205,53 +196,49 @@ public class NettyRemotingClient { if (channel == null) { throw new RemotingException("network error"); } - /** + /* * request unique identification */ final long opaque = command.getOpaque(); - /** + /* * control concurrency number */ boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore); - /** + /* * response future */ final ResponseFuture responseFuture = new ResponseFuture(opaque, - timeoutMillis, - invokeCallback, - releaseSemaphore); + timeoutMillis, + invokeCallback, + releaseSemaphore); try { - channel.writeAndFlush(command).addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - responseFuture.setSendOk(true); - return; - } else { - responseFuture.setSendOk(false); - } - responseFuture.setCause(future.cause()); - responseFuture.putResponse(null); - try { - responseFuture.executeInvokeCallback(); - } catch (Throwable ex) { - logger.error("execute callback error", ex); - } finally { - responseFuture.release(); - } + channel.writeAndFlush(command).addListener(future -> { + if (future.isSuccess()) { + responseFuture.setSendOk(true); + return; + } else { + responseFuture.setSendOk(false); + } + responseFuture.setCause(future.cause()); + responseFuture.putResponse(null); + try { + responseFuture.executeInvokeCallback(); + } catch (Exception ex) { + logger.error("execute callback error", ex); + } finally { + responseFuture.release(); } }); - } catch (Throwable ex) { + } catch (Exception ex) { responseFuture.release(); throw new RemotingException(String.format("send command to host: %s failed", host), ex); } } else { String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", - timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); + timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); throw new RemotingTooMuchRequestException(message); } } @@ -259,12 +246,10 @@ public class NettyRemotingClient { /** * sync send * - * @param host host - * @param command command + * @param host host + * @param command command * @param timeoutMillis timeoutMillis * @return command - * @throws InterruptedException - * @throws RemotingException */ public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException { final Channel channel = getChannel(host); @@ -273,21 +258,18 @@ public class NettyRemotingClient { } final long opaque = command.getOpaque(); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); - channel.writeAndFlush(command).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - responseFuture.setSendOk(true); - return; - } else { - responseFuture.setSendOk(false); - } - responseFuture.setCause(future.cause()); - responseFuture.putResponse(null); - logger.error("send command {} to host {} failed", command, host); + channel.writeAndFlush(command).addListener(future -> { + if (future.isSuccess()) { + responseFuture.setSendOk(true); + return; + } else { + responseFuture.setSendOk(false); } + responseFuture.setCause(future.cause()); + responseFuture.putResponse(null); + logger.error("send command {} to host {} failed", command, host); }); - /** + /* * sync wait for result */ Command result = responseFuture.waitResponse(); @@ -304,9 +286,8 @@ public class NettyRemotingClient { /** * send task * - * @param host host + * @param host host * @param command command - * @throws RemotingException */ public void send(final Host host, final Command command) throws RemotingException { Channel channel = getChannel(host); @@ -332,7 +313,7 @@ public class NettyRemotingClient { * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); @@ -342,8 +323,8 @@ public class NettyRemotingClient { * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { this.clientHandler.registerProcessor(commandType, processor, executor); @@ -351,9 +332,6 @@ public class NettyRemotingClient { /** * get channel - * - * @param host - * @return */ public Channel getChannel(Host host) { Channel channel = channels.get(host); @@ -366,7 +344,7 @@ public class NettyRemotingClient { /** * create channel * - * @param host host + * @param host host * @param isSync sync flag * @return channel */ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index 867cf4dc56..f4a88699ec 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemoteException; import org.apache.dolphinscheduler.remote.handler.NettyServerHandler; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Constants; @@ -93,6 +94,11 @@ public class NettyRemotingServer { */ private final AtomicBoolean isStarted = new AtomicBoolean(false); + /** + * Netty server bind fail message + */ + private static final String NETTY_BIND_FAILURE_MSG = "NettyRemotingServer bind %s fail"; + /** * server init * @@ -102,7 +108,7 @@ public class NettyRemotingServer { this.serverConfig = serverConfig; if (NettyUtils.useEpoll()) { this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + private final AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { @@ -111,7 +117,7 @@ public class NettyRemotingServer { }); this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + private final AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { @@ -120,7 +126,7 @@ public class NettyRemotingServer { }); } else { this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + private final AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { @@ -129,7 +135,7 @@ public class NettyRemotingServer { }); this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + private final AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { @@ -145,35 +151,35 @@ public class NettyRemotingServer { public void start() { if (isStarted.compareAndSet(false, true)) { this.serverBootstrap - .group(this.bossGroup, this.workGroup) - .channel(NettyUtils.getServerSocketChannelClass()) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) - .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) - .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) - .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) - .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) - .childHandler(new ChannelInitializer() { - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - initNettyChannel(ch); - } - }); + .group(this.bossGroup, this.workGroup) + .channel(NettyUtils.getServerSocketChannelClass()) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) + .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) + .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) + .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) + .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) + .childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { + initNettyChannel(ch); + } + }); ChannelFuture future; try { future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); } catch (Exception e) { logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); - throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort())); } if (future.isSuccess()) { logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); } else if (future.cause() != null) { - throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); + throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause()); } else { - throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort())); } } } @@ -185,17 +191,17 @@ public class NettyRemotingServer { */ private void initNettyChannel(SocketChannel ch) { ch.pipeline() - .addLast("encoder", encoder) - .addLast("decoder", new NettyDecoder()) - .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) - .addLast("handler", serverHandler); + .addLast("encoder", encoder) + .addLast("decoder", new NettyDecoder()) + .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) + .addLast("handler", serverHandler); } /** * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); @@ -205,8 +211,8 @@ public class NettyRemotingServer { * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { this.serverHandler.registerProcessor(commandType, processor, executor); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java index 785ee5aaf2..04487ff686 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java @@ -14,16 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.codec; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import org.apache.dolphinscheduler.remote.command.Command; /** - * netty encoder + * netty encoder */ @Sharable public class NettyEncoder extends MessageToByteEncoder { @@ -34,12 +37,11 @@ public class NettyEncoder extends MessageToByteEncoder { * @param ctx channel handler context * @param msg command * @param out byte buffer - * @throws Exception */ @Override protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception { - if(msg == null){ - throw new Exception("encode msg is null"); + if (msg == null) { + throw new RemotingException("encode msg is null"); } out.writeByte(Command.MAGIC); out.writeByte(Command.VERSION); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java index f77221d2e5..03ad4dd694 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java @@ -23,7 +23,7 @@ import java.io.Serializable; import java.util.List; /** - * kill task response command + * kill task response command */ public class TaskKillResponseCommand implements Serializable { @@ -51,7 +51,7 @@ public class TaskKillResponseCommand implements Serializable { /** * other resource manager appId , for example : YARN etc */ - protected List appIds; + private List appIds; public int getTaskInstanceId() { return taskInstanceId; @@ -94,7 +94,7 @@ public class TaskKillResponseCommand implements Serializable { } /** - * package request command + * package request command * * @return command */ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemoteException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemoteException.java new file mode 100644 index 0000000000..a13859026d --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemoteException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.exceptions; + +/** + * Custom runtime exception + */ +public class RemoteException extends RuntimeException { + + /** + * Construct a new runtime exception with the detail message + * + * @param message detail message + */ + public RemoteException(String message) { + super(message); + } + + /** + * Construct a new runtime exception with the detail message and cause + * + * @param message detail message + */ + public RemoteException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java index bbb32c76d1..f9de653d90 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java @@ -18,14 +18,17 @@ package org.apache.dolphinscheduler.remote.future; import org.apache.dolphinscheduler.remote.command.Command; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * response future @@ -34,25 +37,25 @@ public class ResponseFuture { private static final Logger LOGGER = LoggerFactory.getLogger(ResponseFuture.class); - private static final ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256); + private static final ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256); /** - * request unique identification + * request unique identification */ private final long opaque; /** - * timeout + * timeout */ private final long timeoutMillis; /** - * invokeCallback function + * invokeCallback function */ private final InvokeCallback invokeCallback; /** - * releaseSemaphore + * releaseSemaphore */ private final ReleaseSemaphore releaseSemaphore; @@ -61,7 +64,7 @@ public class ResponseFuture { private final long beginTimestamp = System.currentTimeMillis(); /** - * response command + * response command */ private Command responseCommand; @@ -78,10 +81,9 @@ public class ResponseFuture { } /** - * wait for response + * wait for response * * @return command - * @throws InterruptedException */ public Command waitResponse() throws InterruptedException { this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); @@ -89,7 +91,7 @@ public class ResponseFuture { } /** - * put response + * put response * * @param responseCommand responseCommand */ @@ -99,12 +101,13 @@ public class ResponseFuture { FUTURE_TABLE.remove(opaque); } - public static ResponseFuture getFuture(long opaque){ + public static ResponseFuture getFuture(long opaque) { return FUTURE_TABLE.get(opaque); } /** - * whether timeout + * whether timeout + * * @return timeout */ public boolean isTimeout() { @@ -113,7 +116,7 @@ public class ResponseFuture { } /** - * execute invoke callback + * execute invoke callback */ public void executeInvokeCallback() { if (invokeCallback != null) { @@ -162,10 +165,10 @@ public class ResponseFuture { } /** - * release + * release */ public void release() { - if(this.releaseSemaphore != null){ + if (this.releaseSemaphore != null) { this.releaseSemaphore.release(); } } @@ -173,7 +176,7 @@ public class ResponseFuture { /** * scan future table */ - public static void scanFutureTable(){ + public static void scanFutureTable() { final List futureList = new LinkedList<>(); Iterator> it = FUTURE_TABLE.entrySet().iterator(); while (it.hasNext()) { @@ -189,7 +192,7 @@ public class ResponseFuture { try { future.release(); future.executeInvokeCallback(); - } catch (Throwable ex) { + } catch (Exception ex) { LOGGER.warn("scanFutureTable, execute callback error", ex); } } @@ -197,16 +200,16 @@ public class ResponseFuture { @Override public String toString() { - return "ResponseFuture{" + - "opaque=" + opaque + - ", timeoutMillis=" + timeoutMillis + - ", invokeCallback=" + invokeCallback + - ", releaseSemaphore=" + releaseSemaphore + - ", latch=" + latch + - ", beginTimestamp=" + beginTimestamp + - ", responseCommand=" + responseCommand + - ", sendOk=" + sendOk + - ", cause=" + cause + - '}'; + return "ResponseFuture{" + + "opaque=" + opaque + + ", timeoutMillis=" + timeoutMillis + + ", invokeCallback=" + invokeCallback + + ", releaseSemaphore=" + releaseSemaphore + + ", latch=" + latch + + ", beginTimestamp=" + beginTimestamp + + ", responseCommand=" + responseCommand + + ", sendOk=" + sendOk + + ", cause=" + cause + + '}'; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index a988acfe17..c057850947 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -74,7 +74,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) { this.nettyRemotingClient = nettyRemotingClient; this.callbackExecutor = callbackExecutor; - this.processors = new ConcurrentHashMap(); + this.processors = new ConcurrentHashMap<>(); } /** @@ -82,10 +82,9 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { * the current channel has reached the end of its life cycle * * @param ctx channel handler context - * @throws Exception */ @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) { nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); ctx.channel().close(); } @@ -95,10 +94,9 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { * * @param ctx channel handler context * @param msg message - * @throws Exception */ @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) { processReceived(ctx.channel(), (Command) msg); } @@ -106,7 +104,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); @@ -116,8 +114,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; @@ -138,12 +136,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { future.setResponseCommand(command); future.release(); if (future.getInvokeCallback() != null) { - this.callbackExecutor.submit(new Runnable() { - @Override - public void run() { - future.executeInvokeCallback(); - } - }); + this.callbackExecutor.submit(future::executeInvokeCallback); } else { future.putResponse(command); } @@ -158,7 +151,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { Runnable run = () -> { try { pair.getLeft().process(channel, command); - } catch (Throwable e) { + } catch (Exception e) { logger.error(String.format("process command %s exception", command), e); } }; @@ -175,13 +168,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * caught exception * - * @param ctx channel handler context + * @param ctx channel handler context * @param cause cause - * @throws Exception */ @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error("exceptionCaught : {}", cause); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error("exceptionCaught : {}", cause.getMessage(), cause); nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); ctx.channel().close(); } @@ -193,11 +185,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { heartBeat.setType(CommandType.HEART_BEAT); heartBeat.setBody(heartBeatData); ctx.writeAndFlush(heartBeat) - .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt); } } -} \ No newline at end of file +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java index 09e41e9b54..322dfae9b1 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java @@ -55,7 +55,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * server processors queue */ - private final ConcurrentHashMap> processors = new ConcurrentHashMap(); + private final ConcurrentHashMap> processors = new ConcurrentHashMap<>(); public NettyServerHandler(NettyRemotingServer nettyRemotingServer) { this.nettyRemotingServer = nettyRemotingServer; @@ -66,10 +66,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { * the current channel has reached the end of its life cycle * * @param ctx channel handler context - * @throws Exception */ @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) { ctx.channel().close(); } @@ -78,10 +77,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { * * @param ctx channel handler context * @param msg message - * @throws Exception */ @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) { processReceived(ctx.channel(), (Command) msg); } @@ -89,7 +87,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); @@ -99,8 +97,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; @@ -114,21 +112,17 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { * process received logic * * @param channel channel - * @param msg message + * @param msg message */ private void processReceived(final Channel channel, final Command msg) { final CommandType commandType = msg.getType(); final Pair pair = processors.get(commandType); if (pair != null) { - Runnable r = new Runnable() { - - @Override - public void run() { - try { - pair.getLeft().process(channel, msg); - } catch (Throwable ex) { - logger.error("process msg {} error", msg, ex); - } + Runnable r = () -> { + try { + pair.getLeft().process(channel, msg); + } catch (Exception ex) { + logger.error("process msg {} error", msg, ex); } }; try { @@ -144,9 +138,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * caught exception * - * @param ctx channel handler context + * @param ctx channel handler context * @param cause cause - * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { @@ -158,7 +151,6 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { * channel write changed * * @param ctx channel handler context - * @throws Exception */ @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { @@ -168,14 +160,14 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { if (!ch.isWritable()) { if (logger.isWarnEnabled()) { logger.warn("{} is not writable, over high water level : {}", - ch, config.getWriteBufferHighWaterMark()); + ch, config.getWriteBufferHighWaterMark()); } config.setAutoRead(false); } else { if (logger.isWarnEnabled()) { logger.warn("{} is writable, to low water : {}", - ch, config.getWriteBufferLowWaterMark()); + ch, config.getWriteBufferLowWaterMark()); } config.setAutoRead(true); } @@ -189,4 +181,4 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { super.userEventTriggered(ctx, evt); } } -} \ No newline at end of file +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java index 138a8f0bdf..a1ffb8768e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java @@ -14,43 +14,50 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.remote.utils; -import io.netty.channel.Channel; +package org.apache.dolphinscheduler.remote.utils; import java.net.InetSocketAddress; +import io.netty.channel.Channel; + /** - * channel utils + * channel utils */ public class ChannelUtils { + private ChannelUtils() { + throw new IllegalStateException(ChannelUtils.class.getName()); + } + /** - * get local address + * get local address * * @param channel channel * @return local address */ - public static String getLocalAddress(Channel channel){ - return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress(); + public static String getLocalAddress(Channel channel) { + return ((InetSocketAddress) channel.localAddress()).getAddress().getHostAddress(); } /** - * get remote address + * get remote address + * * @param channel channel * @return remote address */ - public static String getRemoteAddress(Channel channel){ - return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress(); + public static String getRemoteAddress(Channel channel) { + return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress(); } /** - * channel to address + * channel to address + * * @param channel channel * @return address */ - public static Host toAddress(Channel channel){ - InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress()); + public static Host toAddress(Channel channel) { + InetSocketAddress socketAddress = ((InetSocketAddress) channel.remoteAddress()); return new Host(socketAddress.getAddress().getHostAddress(), socketAddress.getPort()); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index 866ebb6c2b..e5156ba5b3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -25,6 +25,10 @@ import java.nio.charset.StandardCharsets; */ public class Constants { + private Constants() { + throw new IllegalStateException(Constants.class.getName()); + } + public static final String COMMA = ","; public static final String SLASH = "/"; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java index 2fa82fd5ba..91f61f852d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.remote.utils; +import org.apache.dolphinscheduler.remote.exceptions.RemoteException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,22 +30,24 @@ import java.util.regex.Pattern; public class IPUtils { - private static final Logger logger = LoggerFactory.getLogger(IPUtils.class); + private IPUtils() { + throw new IllegalStateException(IPUtils.class.getName()); + } - private static String IP_REGEX = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}"; + private static final Logger logger = LoggerFactory.getLogger(IPUtils.class); - private static String LOCAL_HOST = "unknown"; + private static String localHost = "unknown"; static { String host = System.getenv("HOSTNAME"); if (isNotEmpty(host)) { - LOCAL_HOST = host; + localHost = host; } else { try { String hostName = InetAddress.getLocalHost().getHostName(); if (isNotEmpty(hostName)) { - LOCAL_HOST = hostName; + localHost = hostName; } } catch (UnknownHostException e) { logger.error("get hostName error!", e); @@ -52,7 +56,7 @@ public class IPUtils { } public static String getLocalHost() { - return LOCAL_HOST; + return localHost; } @@ -100,7 +104,7 @@ public class IPUtils { return addresses; } catch (SocketException e) { - throw new RuntimeException(e.getMessage(), e); + throw new RemoteException(e.getMessage(), e); } } @@ -131,12 +135,11 @@ public class IPUtils { return false; } - Pattern pat = Pattern.compile(IP_REGEX); + String ipRegex = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}"; + Pattern pat = Pattern.compile(ipRegex); Matcher mat = pat.matcher(addr); - boolean ipAddress = mat.find(); - - return ipAddress; + return mat.find(); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java index 33bf8ca7c3..57cec4bbe7 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.remote.utils; - /** * key value pair * @@ -51,7 +50,7 @@ public class Pair { this.right = right; } - public static Pair of(L left, R right){ - return new Pair(left, right); + public static Pair of(L left, R right) { + return new Pair<>(left, right); } }