Browse Source

# Conflicts resolve :

#	dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
#	dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
#	dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
#	dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
#	dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
#	dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
#	dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
#	dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
pull/2/head
qiaozhanwei 5 years ago
parent
commit
b2e0b8598f
  1. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
  2. 133
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
  3. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
  4. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
  5. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
  6. 23
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
  7. 13
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
  8. 21
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
  9. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
  10. 20
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java
  11. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
  12. 101
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
  13. 107
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
  14. 14
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
  15. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  16. 99
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java

@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
/**
* log service
*/
@ -39,6 +41,17 @@ public class LoggerService {
@Autowired
private ProcessService processService;
private final LogClientService logClient;
public LoggerService(){
logClient = new LogClientService();
}
@PreDestroy
public void close(){
logClient.close();
}
/**
* view log
*
@ -64,18 +77,9 @@ public class LoggerService {
Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
LogClientService logClient = null;
try {
logClient = new LogClientService(host, Constants.RPC_PORT);
String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log);
logger.info(log);
} finally {
if(logClient != null){
logClient.close();
}
}
String log = logClient.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log);
logger.info(log);
return result;
}
@ -90,16 +94,7 @@ public class LoggerService {
if (taskInstance == null){
throw new RuntimeException("task instance is null");
}
String host = taskInstance.getHost();
LogClientService logClient = null;
try {
logClient = new LogClientService(host, Constants.RPC_PORT);
return logClient.getLogBytes(taskInstance.getLogPath());
} finally {
if(logClient != null){
logClient.close();
}
}
return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
}
}

133
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@ -25,17 +25,19 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.rmi.RemoteException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -50,51 +52,22 @@ public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
/**
* bootstrap
*/
private final Bootstrap bootstrap = new Bootstrap();
/**
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder();
/**
* channels
*/
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap();
/**
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
/**
* started flag
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* worker group
*/
private final NioEventLoopGroup workerGroup;
/**
* client handler
*/
private final NettyClientHandler clientHandler = new NettyClientHandler(this);
/**
* netty client config
*/
private final NettyClientConfig clientConfig;
/**
* netty client init
*
* @param clientConfig client config
*/
public NettyRemotingClient(final NettyClientConfig clientConfig){
this.clientConfig = clientConfig;
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
@ -108,9 +81,6 @@ public class NettyRemotingClient {
this.start();
}
/**
* netty server start
*/
private void start(){
this.bootstrap
@ -129,37 +99,12 @@ public class NettyRemotingClient {
encoder);
}
});
//
isStarted.compareAndSet(false, true);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
registerProcessor(commandType, processor, null);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor);
}
/**
* send connect
* @param address address
* @param command command
* @throws RemotingException
*/
public void send(final Address address, final Command command) throws RemotingException {
//TODO
public void send(final Address address, final Command command, final InvokeCallback invokeCallback) throws RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
throw new RemotingException("network error");
@ -182,11 +127,43 @@ public class NettyRemotingClient {
}
}
/**
* get channel
* @param address address
* @return channel
*/
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", address));
}
final long opaque = command.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null);
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
responseFuture.setSendOk(true);
return;
} else{
responseFuture.setSendOk(false);
responseFuture.setCause(channelFuture.cause());
responseFuture.putResponse(null);
logger.error("send command {} to address {} failed", command, address);
}
}
});
Command result = responseFuture.waitResponse();
if(result == null){
if(responseFuture.isSendOK()){
throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
} else{
throw new RemoteException(address.toString(), responseFuture.getCause());
}
}
return result;
} catch (Exception ex) {
String msg = String.format("send command %s to address %s error", command, address);
throw new RemotingException(msg, ex);
}
}
public Channel getChannel(Address address) {
Channel channel = channels.get(address);
if(channel != null && channel.isActive()){
@ -195,12 +172,6 @@ public class NettyRemotingClient {
return createChannel(address, true);
}
/**
* create channel
* @param address address
* @param isSync is sync
* @return channel
*/
public Channel createChannel(Address address, boolean isSync) {
ChannelFuture future;
try {
@ -221,17 +192,10 @@ public class NettyRemotingClient {
return null;
}
/**
* get default thread executor
* @return thread executor
*/
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}
/**
* close client
*/
public void close() {
if(isStarted.compareAndSet(true, false)){
try {
@ -249,9 +213,6 @@ public class NettyRemotingClient {
}
}
/**
* close channel
*/
private void closeChannels(){
for (Channel channel : this.channels.values()) {
channel.close();
@ -259,11 +220,7 @@ public class NettyRemotingClient {
this.channels.clear();
}
/**
* remove channel
* @param address address
*/
public void removeChannel(Address address){
public void closeChannel(Address address){
Channel channel = this.channels.remove(address);
if(channel != null){
channel.close();

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java

@ -16,7 +16,10 @@
*/
package org.apache.dolphinscheduler.remote.command;
import com.sun.org.apache.regexp.internal.RE;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* receive task log request command and content fill
@ -24,11 +27,12 @@ import java.io.Serializable;
*/
public class Command implements Serializable {
private static final long serialVersionUID = 1L;
private static final AtomicLong REQUEST_ID = new AtomicLong(1);
public static final byte MAGIC = (byte) 0xbabe;
public Command(){
this.opaque = REQUEST_ID.getAndIncrement();
}
public Command(long opaque){

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** * attempt id */ cm">/** * execute task request command /** cm">/** */ */ cm">/** public class ExecuteTaskRequestCommand implements Serializable { /** * group name */ private String groupName; /** * task name */ private String taskName; /** * connect port */ * execute task request command /** * execute task request command cm">/** */ * execute task request command * execute task request command /** * execute task request command */ */ * execute task request command public class ExecuteTaskRequestCommand implements Serializable { /** * execute task request command */ * execute task request command private static final AtomicLong REQUEST = new AtomicLong(1); /** * execute task request command /** */ * execute task request command * task id /** * execute task request command */ */ > private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; */ cm">/** */ * execute task request command */ */ */ * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { */ * execute task request command */ */ * execute task request command private String groupName; */ * execute task request command */ /** */ * execute task request command private int connectorPort; */ * execute task request command */ */ */ * execute task request command private String className; */ * execute task request command private String methodName; */ * execute task request command public class ExecuteTaskRequestCommand implements Serializable { * execute task request command public class ExecuteTaskRequestCommand implements Serializable { */ > private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; /** * execute taks response command */ public class ExecuteTaskResponseCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** * attempt id */ cm">/** * execute taks response command /** * result info */ cm">/** public class ExecuteTaskResponseCommand implements Serializable { /** cm">/** */ cm">/** private static final AtomicLong REQUEST = new AtomicLong(1); /** * execute count */ private int executeCount; /** * execute time * execute taks response command > private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } /** * package response command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; * execute taks response command cm">/** * execute taks response command * execute taks response command * execute taks response command */ * execute taks response command public class ExecuteTaskResponseCommand implements Serializable { * execute taks response command * execute taks response command private static final AtomicLong REQUEST = new AtomicLong(1); * execute taks response command /** * execute taks response command * task id > private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(long opaque){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }

23
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java

@ -23,21 +23,11 @@ import io.netty.buffer.Unpooled;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* ping machine
*/
public class Ping implements Serializable {
private static final AtomicLong ID = new AtomicLong(1);
public class Ping implements Serializable {
/**
* ping body
*/
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
/**
* request command body
*/
private static byte[] EMPTY_BODY_ARRAY = new byte[0];
private static final ByteBuf PING_BUF;
@ -52,21 +42,12 @@ public class Ping implements Serializable {
PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
}
/**
* ping connect
* @return result
*/
public static ByteBuf pingContent(){
return PING_BUF.duplicate();
}
/**
* package ping command
*
* @return command
*/
public static Command create(){
Command command = new Command(ID.getAndIncrement());
Command command = new Command();
command.setType(CommandType.PING);
command.setBody(EMPTY_BODY_ARRAY);
return command;

13
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java

@ -29,14 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class GetLogBytesRequestCommand implements Serializable {
/**
* request id
*/
private static final AtomicLong REQUEST = new AtomicLong(1);
/**
* log path
*/
private String path;
public GetLogBytesRequestCommand() {
@ -55,12 +47,11 @@ public class GetLogBytesRequestCommand implements Serializable {
}
/**
* package request command
*
* @return command
* @return
*/
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
Command command = new Command();
command.setType(CommandType.GET_LOG_BYTES_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);

21
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java

@ -29,24 +29,10 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class RollViewLogRequestCommand implements Serializable {
/**
* request id
*/
private static final AtomicLong REQUEST = new AtomicLong(1);
/**
* log path
*/
private String path;
/**
* skip line number
*/
private int skipLineNum;
/**
* query log line number limit
*/
private int limit;
public RollViewLogRequestCommand() {
@ -82,13 +68,8 @@ public class RollViewLogRequestCommand implements Serializable {
this.limit = limit;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
Command command = new Command();
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java

@ -29,11 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class ViewLogRequestCommand implements Serializable {
/**
* request id
*/
private static final AtomicLong REQUEST = new AtomicLong(1);
private String path;
public ViewLogRequestCommand() {
@ -51,13 +46,8 @@ public class ViewLogRequestCommand implements Serializable {
this.path = path;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
Command command = new Command();
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);

20
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java

@ -0,0 +1,20 @@
package org.apache.dolphinscheduler.remote.exceptions;
/**
* @Author: Tboy
*/
public class RemotingTimeoutException extends RemotingException{
public RemotingTimeoutException(String message) {
super(message);
}
public RemotingTimeoutException(String address, long timeoutMillis) {
this(address, timeoutMillis, null);
}
public RemotingTimeoutException(String address, long timeoutMillis, Throwable cause) {
super(String.format("wait response on the channel %s timeout %s", address, timeoutMillis), cause);
}
}

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java

@ -0,0 +1,10 @@
package org.apache.dolphinscheduler.remote.future;
/**
* @Author: Tboy
*/
public interface InvokeCallback {
void operationComplete(final ResponseFuture responseFuture);
}

101
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java

@ -0,0 +1,101 @@
package org.apache.dolphinscheduler.remote.future;
import org.apache.dolphinscheduler.remote.command.Command;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @Author: Tboy
*/
public class ResponseFuture {
private final static ConcurrentHashMap<Long,ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
private final long opaque;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final CountDownLatch latch = new CountDownLatch(1);
private final long beginTimestamp = System.currentTimeMillis();
private volatile Command responseCommand;
private volatile boolean sendOk = true;
private volatile Throwable cause;
public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback) {
this.opaque = opaque;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
FUTURE_TABLE.put(opaque, this);
}
public Command waitResponse() throws InterruptedException {
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final Command responseCommand) {
this.responseCommand = responseCommand;
this.latch.countDown();
FUTURE_TABLE.remove(opaque);
}
public static ResponseFuture getFuture(long opaque){
return FUTURE_TABLE.get(opaque);
}
public boolean isTimeout() {
long diff = System.currentTimeMillis() - this.beginTimestamp;
return diff > this.timeoutMillis;
}
public void executeInvokeCallback() {
if (invokeCallback != null) {
invokeCallback.operationComplete(this);
}
}
public boolean isSendOK() {
return sendOk;
}
public void setSendOk(boolean sendOk) {
this.sendOk = sendOk;
}
public void setCause(Throwable cause) {
this.cause = cause;
}
public Throwable getCause() {
return cause;
}
public long getOpaque() {
return opaque;
}
public long getTimeoutMillis() {
return timeoutMillis;
}
public long getBeginTimestamp() {
return beginTimestamp;
}
public Command getResponseCommand() {
return responseCommand;
}
public InvokeCallback getInvokeCallback() {
return invokeCallback;
}
}

107
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java

@ -19,17 +19,11 @@ package org.apache.dolphinscheduler.remote.handler;
import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
/**
* netty client request handler
*/
@ -38,119 +32,40 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
/**
* netty remote client
*/
private final NettyRemotingClient nettyRemotingClient;
/**
* client processors queue
*/
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
this.nettyRemotingClient = nettyRemotingClient;
}
/**
* When the current channel is not active,
* the current channel has reached the end of its life cycle
*
* @param ctx channel handler context
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}
/**
* The current channel reads data from the remote
*
* @param ctx channel handler context
* @param msg message
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived(ctx.channel(), (Command)msg);
processReceived((Command)msg);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if(executorRef == null){
executorRef = nettyRemotingClient.getDefaultExecutor();
}
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
}
/**
* process received logic
*
* @param channel channel
* @param msg message
*/
private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType();
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) {
Runnable r = new Runnable() {
@Override
public void run() {
try {
pair.getLeft().process(channel, msg);
} catch (Throwable ex) {
logger.error("process msg {} error : {}", msg, ex);
}
}
};
try {
pair.getRight().submit(r);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
}
} else {
logger.warn("commandType {} not support", commandType);
private void processReceived(final Command responseCommand) {
ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
if(future != null){
future.putResponse(responseCommand);
future.executeInvokeCallback();
} else{
logger.warn("receive response {}, but not matched any request ", responseCommand);
}
}
/**
* caught exception
*
* @param ctx channel handler context
* @param cause cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause);
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}
/**
* channel write changed
* @param ctx channel handler context
* @throws Exception
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();

14
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java

@ -54,24 +54,14 @@ public class NettyRemotingClientTest {
});
server.start();
//
CountDownLatch latch = new CountDownLatch(1);
AtomicLong opaque = new AtomicLong(1);
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() {
@Override
public void process(Channel channel, Command command) {
opaque.set(command.getOpaque());
latch.countDown();
}
});
Command commandPing = Ping.create();
try {
client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing);
latch.await();
Command response = client.sendSync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
} catch (Exception e) {
e.printStackTrace();
}
Assert.assertEquals(opaque.get(), commandPing.getOpaque());
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -378,8 +378,8 @@ public class ProcessUtils {
LogClientService logClient = null;
String log = null;
try {
logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT);
log = logClient.viewLog(taskInstance.getLogPath());
logClient = new LogClientService();
log = logClient.viewLog(taskInstance.getHost(), Constants.RPC_PORT, taskInstance.getLogPath());
} finally {
if(logClient != null){
logClient.close();

99
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@ -16,13 +16,10 @@
*/
package org.apache.dolphinscheduler.service.log;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.slf4j.Logger;
@ -32,7 +29,7 @@ import org.slf4j.LoggerFactory;
/**
* log client
*/
public class LogClientService implements NettyRequestProcessor {
public class LogClientService {
private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
@ -40,8 +37,6 @@ public class LogClientService implements NettyRequestProcessor {
private final NettyRemotingClient client;
private final Address address;
/**
* request time out
*/
@ -49,18 +44,11 @@ public class LogClientService implements NettyRequestProcessor {
/**
* construct client
* @param host host
* @param port port
*/
public LogClientService(String host, int port) {
this.address = new Address(host, port);
public LogClientService() {
this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(1);
this.clientConfig.setWorkerThreads(4);
this.client = new NettyRemotingClient(clientConfig);
this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
}
/**
@ -73,94 +61,87 @@ public class LogClientService implements NettyRequestProcessor {
/**
* roll view log
* @param host host
* @param port port
* @param path path
* @param skipLineNum skip line number
* @param limit limit
* @return log content
*/
public String rollViewLog(String path,int skipLineNum,int limit) {
logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit);
public String rollViewLog(String host, int port, String path,int skipLineNum,int limit) {
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
final Address address = new Address(host, port);
try {
Command command = request.convert2Command();
this.client.send(address, command);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
result = ((String)promise.getResult());
Command response = this.client.sendSync(address, command, logRequestTimeout);
if(response != null){
RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
command.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg();
}
} catch (Exception e) {
logger.error("roll view log error", e);
} finally {
this.client.closeChannel(address);
}
return result;
}
/**
* view log
* @param host host
* @param port port
* @param path path
* @return log content
*/
public String viewLog(String path) {
public String viewLog(String host, int port, String path) {
logger.info("view log path {}", path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = "";
final Address address = new Address(host, port);
try {
Command command = request.convert2Command();
this.client.send(address, command);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
result = ((String)promise.getResult());
Command response = this.client.sendSync(address, command, logRequestTimeout);
if(response != null){
ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
response.getBody(), ViewLogResponseCommand.class);
return viewLog.getMsg();
}
} catch (Exception e) {
logger.error("view log error", e);
} finally {
this.client.closeChannel(address);
}
return result;
}
/**
* get log size
* @param host host
* @param port port
* @param path log path
* @return log content bytes
*/
public byte[] getLogBytes(String path) {
public byte[] getLogBytes(String host, int port, String path) {
logger.info("log path {}", path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
byte[] result = null;
final Address address = new Address(host, port);
try {
Command command = request.convert2Command();
this.client.send(address, command);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
result = (byte[])promise.getResult();
Command response = this.client.sendSync(address, command, logRequestTimeout);
if(response != null){
GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
response.getBody(), GetLogBytesResponseCommand.class);
return getLog.getData();
}
} catch (Exception e) {
logger.error("get log size error", e);
} finally {
this.client.closeChannel(address);
}
return result;
}
@Override
public void process(Channel channel, Command command) {
logger.info("received log response : {}", command);
switch (command.getType()){
case ROLL_VIEW_LOG_RESPONSE:
RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
command.getBody(), RollViewLogResponseCommand.class);
LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
break;
case VIEW_WHOLE_LOG_RESPONSE:
ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
command.getBody(), ViewLogResponseCommand.class);
LogPromise.notify(command.getOpaque(), viewLog.getMsg());
break;
case GET_LOG_BYTES_RESPONSE:
GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesResponseCommand.class);
LogPromise.notify(command.getOpaque(), getLog.getData());
break;
default:
throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
}
}
public static void main(String[] args) throws Exception{
LogClientService logClient = new LogClientService("192.168.220.247", 50051);
byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log");
System.out.println(new String(logBytes));
}
}
Loading…
Cancel
Save