From 8e154ecbe46035d9338371d4a8d8bea9fccc81d3 Mon Sep 17 00:00:00 2001 From: Technoboy- Date: Thu, 13 Feb 2020 22:06:34 +0800 Subject: [PATCH] refactor log client service --- .../api/service/LoggerService.java | 38 ++++++++---------- .../remote/NettyRemotingClient.java | 17 +++++++- .../remote/future/InvokeCallback.java | 10 +++++ .../remote/future/ResponseFuture.java | 29 ++++++++++++++ .../remote/handler/NettyClientHandler.java | 4 +- .../server/utils/ProcessUtils.java | 4 +- .../service/log/LogClientService.java | 40 ++++++++++--------- 7 files changed, 97 insertions(+), 45 deletions(-) create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java index bff54b6c21..f20f6574c3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java @@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.PreDestroy; + /** * log service */ @@ -39,6 +41,17 @@ public class LoggerService { @Autowired private ProcessService processService; + private final LogClientService logClient; + + public LoggerService(){ + logClient = new LogClientService(); + } + + @PreDestroy + public void close(){ + logClient.close(); + } + /** * view log * @@ -64,17 +77,9 @@ public class LoggerService { Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); - LogClientService logClient = null; - try { - logClient = new LogClientService(host, Constants.RPC_PORT); - String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit); - result.setData(log); - logger.info(log); - } finally { - if(logClient != null){ - logClient.close(); - } - } + String log = logClient.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(),skipLineNum,limit); + result.setData(log); + logger.info(log); return result; } @@ -90,16 +95,7 @@ public class LoggerService { if (taskInstance == null){ throw new RuntimeException("task instance is null"); } - String host = taskInstance.getHost(); - LogClientService logClient = null; - try { - logClient = new LogClientService(host, Constants.RPC_PORT); - return logClient.getLogBytes(taskInstance.getLogPath()); - } finally { - if(logClient != null){ - logClient.close(); - } - } + return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath()); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 678fe84f90..f8357a388a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -132,6 +132,21 @@ public class NettyRemotingClient { } } + //TODO + public void sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException { + final Channel channel = getChannel(address); + if (channel == null) { + throw new RemotingException("network error"); + } + final long opaque = command.getOpaque(); + try { + + } catch (Exception ex) { + String msg = String.format("send command %s to address %s encounter error", command, address); + throw new RemotingException(msg, ex); + } + } + public Channel getChannel(Address address) { Channel channel = channels.get(address); if(channel != null && channel.isActive()){ @@ -188,7 +203,7 @@ public class NettyRemotingClient { this.channels.clear(); } - public void removeChannel(Address address){ + public void closeChannel(Address address){ Channel channel = this.channels.remove(address); if(channel != null){ channel.close(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java new file mode 100644 index 0000000000..6ad6a7cbee --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java @@ -0,0 +1,10 @@ +package org.apache.dolphinscheduler.remote.future; + +/** + * @Author: Tboy + */ +public interface InvokeCallback { + + void operationComplete(final ResponseFuture responseFuture); + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java new file mode 100644 index 0000000000..036f9906b1 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java @@ -0,0 +1,29 @@ +package org.apache.dolphinscheduler.remote.future; + +import org.apache.dolphinscheduler.remote.command.Command; + +import java.util.concurrent.CountDownLatch; + +/** + * @Author: Tboy + */ +public class ResponseFuture { + + private final int opaque; + + private final long timeoutMillis; + + private final InvokeCallback invokeCallback; + + private final long beginTimestamp = System.currentTimeMillis(); + + private final CountDownLatch latch = new CountDownLatch(1); + + public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback) { + this.opaque = opaque; + this.timeoutMillis = timeoutMillis; + this.invokeCallback = invokeCallback; + } + + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index b06308090f..115f6e4953 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -48,7 +48,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); + nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); ctx.channel().close(); } @@ -96,7 +96,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("exceptionCaught : {}", cause); - nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); + nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); ctx.channel().close(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index 90711e1d14..e0c00c55d9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -378,8 +378,8 @@ public class ProcessUtils { LogClientService logClient = null; String log = null; try { - logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT); - log = logClient.viewLog(taskInstance.getLogPath()); + logClient = new LogClientService(); + log = logClient.viewLog(taskInstance.getHost(), Constants.RPC_PORT, taskInstance.getLogPath()); } finally { if(logClient != null){ logClient.close(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java index aa6999ef0d..4faff335fe 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java @@ -40,8 +40,6 @@ public class LogClientService implements NettyRequestProcessor { private final NettyRemotingClient client; - private final Address address; - /** * request time out */ @@ -49,18 +47,14 @@ public class LogClientService implements NettyRequestProcessor { /** * construct client - * @param host host - * @param port port */ - public LogClientService(String host, int port) { - this.address = new Address(host, port); + public LogClientService() { this.clientConfig = new NettyClientConfig(); - this.clientConfig.setWorkerThreads(1); + this.clientConfig.setWorkerThreads(4); this.client = new NettyRemotingClient(clientConfig); this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this); this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this); this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this); - } /** @@ -73,15 +67,18 @@ public class LogClientService implements NettyRequestProcessor { /** * roll view log + * @param host host + * @param port port * @param path path * @param skipLineNum skip line number * @param limit limit * @return log content */ - public String rollViewLog(String path,int skipLineNum,int limit) { - logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit); + public String rollViewLog(String host, int port, String path,int skipLineNum,int limit) { + logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit); RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit); String result = ""; + final Address address = new Address(host, port); try { Command command = request.convert2Command(); this.client.send(address, command); @@ -89,19 +86,24 @@ public class LogClientService implements NettyRequestProcessor { result = ((String)promise.getResult()); } catch (Exception e) { logger.error("roll view log error", e); + } finally { + this.client.closeChannel(address); } return result; } /** * view log + * @param host host + * @param port port * @param path path * @return log content */ - public String viewLog(String path) { + public String viewLog(String host, int port, String path) { logger.info("view log path {}", path); ViewLogRequestCommand request = new ViewLogRequestCommand(path); String result = ""; + final Address address = new Address(host, port); try { Command command = request.convert2Command(); this.client.send(address, command); @@ -109,19 +111,24 @@ public class LogClientService implements NettyRequestProcessor { result = ((String)promise.getResult()); } catch (Exception e) { logger.error("view log error", e); + } finally { + this.client.closeChannel(address); } return result; } /** * get log size + * @param host host + * @param port port * @param path log path * @return log content bytes */ - public byte[] getLogBytes(String path) { + public byte[] getLogBytes(String host, int port, String path) { logger.info("log path {}", path); GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path); byte[] result = null; + final Address address = new Address(host, port); try { Command command = request.convert2Command(); this.client.send(address, command); @@ -129,6 +136,8 @@ public class LogClientService implements NettyRequestProcessor { result = (byte[])promise.getResult(); } catch (Exception e) { logger.error("get log size error", e); + } finally { + this.client.closeChannel(address); } return result; } @@ -156,11 +165,4 @@ public class LogClientService implements NettyRequestProcessor { throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType())); } } - - public static void main(String[] args) throws Exception{ - LogClientService logClient = new LogClientService("192.168.220.247", 50051); - byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log"); - System.out.println(new String(logBytes)); - } - } \ No newline at end of file