Browse Source

refactor log client service

pull/2/head
Technoboy- 4 years ago
parent
commit
8e154ecbe4
  1. 38
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
  2. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
  3. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
  4. 29
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
  5. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
  6. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  7. 40
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

38
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java

@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
/** /**
* log service * log service
*/ */
@ -39,6 +41,17 @@ public class LoggerService {
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
private final LogClientService logClient;
public LoggerService(){
logClient = new LogClientService();
}
@PreDestroy
public void close(){
logClient.close();
}
/** /**
* view log * view log
* *
@ -64,17 +77,9 @@ public class LoggerService {
Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
LogClientService logClient = null; String log = logClient.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(),skipLineNum,limit);
try { result.setData(log);
logClient = new LogClientService(host, Constants.RPC_PORT); logger.info(log);
String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log);
logger.info(log);
} finally {
if(logClient != null){
logClient.close();
}
}
return result; return result;
} }
@ -90,16 +95,7 @@ public class LoggerService {
if (taskInstance == null){ if (taskInstance == null){
throw new RuntimeException("task instance is null"); throw new RuntimeException("task instance is null");
} }
String host = taskInstance.getHost(); String host = taskInstance.getHost();
LogClientService logClient = null; return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
try {
logClient = new LogClientService(host, Constants.RPC_PORT);
return logClient.getLogBytes(taskInstance.getLogPath());
} finally {
if(logClient != null){
logClient.close();
}
}
} }
} }

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@ -132,6 +132,21 @@ public class NettyRemotingClient {
} }
} }
//TODO
public void sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
throw new RemotingException("network error");
}
final long opaque = command.getOpaque();
try {
} catch (Exception ex) {
String msg = String.format("send command %s to address %s encounter error", command, address);
throw new RemotingException(msg, ex);
}
}
public Channel getChannel(Address address) { public Channel getChannel(Address address) {
Channel channel = channels.get(address); Channel channel = channels.get(address);
if(channel != null && channel.isActive()){ if(channel != null && channel.isActive()){
@ -188,7 +203,7 @@ public class NettyRemotingClient {
this.channels.clear(); this.channels.clear();
} }
public void removeChannel(Address address){ public void closeChannel(Address address){
Channel channel = this.channels.remove(address); Channel channel = this.channels.remove(address);
if(channel != null){ if(channel != null){
channel.close(); channel.close();

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java

@ -0,0 +1,10 @@
package org.apache.dolphinscheduler.remote.future;
/**
* @Author: Tboy
*/
public interface InvokeCallback {
void operationComplete(final ResponseFuture responseFuture);
}

29
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java

@ -0,0 +1,29 @@
package org.apache.dolphinscheduler.remote.future;
import org.apache.dolphinscheduler.remote.command.Command;
import java.util.concurrent.CountDownLatch;
/**
* @Author: Tboy
*/
public class ResponseFuture {
private final int opaque;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch latch = new CountDownLatch(1);
public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback) {
this.opaque = opaque;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
}
}

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java

@ -48,7 +48,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close(); ctx.channel().close();
} }
@ -96,7 +96,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause); logger.error("exceptionCaught : {}", cause);
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close(); ctx.channel().close();
} }

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -378,8 +378,8 @@ public class ProcessUtils {
LogClientService logClient = null; LogClientService logClient = null;
String log = null; String log = null;
try { try {
logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT); logClient = new LogClientService();
log = logClient.viewLog(taskInstance.getLogPath()); log = logClient.viewLog(taskInstance.getHost(), Constants.RPC_PORT, taskInstance.getLogPath());
} finally { } finally {
if(logClient != null){ if(logClient != null){
logClient.close(); logClient.close();

40
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@ -40,8 +40,6 @@ public class LogClientService implements NettyRequestProcessor {
private final NettyRemotingClient client; private final NettyRemotingClient client;
private final Address address;
/** /**
* request time out * request time out
*/ */
@ -49,18 +47,14 @@ public class LogClientService implements NettyRequestProcessor {
/** /**
* construct client * construct client
* @param host host
* @param port port
*/ */
public LogClientService(String host, int port) { public LogClientService() {
this.address = new Address(host, port);
this.clientConfig = new NettyClientConfig(); this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(1); this.clientConfig.setWorkerThreads(4);
this.client = new NettyRemotingClient(clientConfig); this.client = new NettyRemotingClient(clientConfig);
this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this); this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this); this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this); this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
} }
/** /**
@ -73,15 +67,18 @@ public class LogClientService implements NettyRequestProcessor {
/** /**
* roll view log * roll view log
* @param host host
* @param port port
* @param path path * @param path path
* @param skipLineNum skip line number * @param skipLineNum skip line number
* @param limit limit * @param limit limit
* @return log content * @return log content
*/ */
public String rollViewLog(String path,int skipLineNum,int limit) { public String rollViewLog(String host, int port, String path,int skipLineNum,int limit) {
logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit); logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit); RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = ""; String result = "";
final Address address = new Address(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
this.client.send(address, command); this.client.send(address, command);
@ -89,19 +86,24 @@ public class LogClientService implements NettyRequestProcessor {
result = ((String)promise.getResult()); result = ((String)promise.getResult());
} catch (Exception e) { } catch (Exception e) {
logger.error("roll view log error", e); logger.error("roll view log error", e);
} finally {
this.client.closeChannel(address);
} }
return result; return result;
} }
/** /**
* view log * view log
* @param host host
* @param port port
* @param path path * @param path path
* @return log content * @return log content
*/ */
public String viewLog(String path) { public String viewLog(String host, int port, String path) {
logger.info("view log path {}", path); logger.info("view log path {}", path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path); ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = ""; String result = "";
final Address address = new Address(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
this.client.send(address, command); this.client.send(address, command);
@ -109,19 +111,24 @@ public class LogClientService implements NettyRequestProcessor {
result = ((String)promise.getResult()); result = ((String)promise.getResult());
} catch (Exception e) { } catch (Exception e) {
logger.error("view log error", e); logger.error("view log error", e);
} finally {
this.client.closeChannel(address);
} }
return result; return result;
} }
/** /**
* get log size * get log size
* @param host host
* @param port port
* @param path log path * @param path log path
* @return log content bytes * @return log content bytes
*/ */
public byte[] getLogBytes(String path) { public byte[] getLogBytes(String host, int port, String path) {
logger.info("log path {}", path); logger.info("log path {}", path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path); GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
byte[] result = null; byte[] result = null;
final Address address = new Address(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
this.client.send(address, command); this.client.send(address, command);
@ -129,6 +136,8 @@ public class LogClientService implements NettyRequestProcessor {
result = (byte[])promise.getResult(); result = (byte[])promise.getResult();
} catch (Exception e) { } catch (Exception e) {
logger.error("get log size error", e); logger.error("get log size error", e);
} finally {
this.client.closeChannel(address);
} }
return result; return result;
} }
@ -156,11 +165,4 @@ public class LogClientService implements NettyRequestProcessor {
throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType())); throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
} }
} }
public static void main(String[] args) throws Exception{
LogClientService logClient = new LogClientService("192.168.220.247", 50051);
byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log");
System.out.println(new String(logBytes));
}
} }
Loading…
Cancel
Save