diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java index bff54b6c21..1f65208240 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java @@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.PreDestroy; + /** * log service */ @@ -39,6 +41,17 @@ public class LoggerService { @Autowired private ProcessService processService; + private final LogClientService logClient; + + public LoggerService(){ + logClient = new LogClientService(); + } + + @PreDestroy + public void close(){ + logClient.close(); + } + /** * view log * @@ -64,18 +77,9 @@ public class LoggerService { Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); - LogClientService logClient = null; - try { - logClient = new LogClientService(host, Constants.RPC_PORT); - String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit); - result.setData(log); - logger.info(log); - } finally { - if(logClient != null){ - logClient.close(); - } - } - + String log = logClient.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(),skipLineNum,limit); + result.setData(log); + logger.info(log); return result; } @@ -90,16 +94,7 @@ public class LoggerService { if (taskInstance == null){ throw new RuntimeException("task instance is null"); } - String host = taskInstance.getHost(); - LogClientService logClient = null; - try { - logClient = new LogClientService(host, Constants.RPC_PORT); - return logClient.getLogBytes(taskInstance.getLogPath()); - } finally { - if(logClient != null){ - logClient.close(); - } - } + return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath()); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index df0c13ad38..3dcb8c0825 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -25,17 +25,19 @@ import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException; +import org.apache.dolphinscheduler.remote.future.InvokeCallback; +import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.handler.NettyClientHandler; -import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Address; import org.apache.dolphinscheduler.remote.utils.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.rmi.RemoteException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -50,51 +52,22 @@ public class NettyRemotingClient { private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class); - /** - * bootstrap - */ private final Bootstrap bootstrap = new Bootstrap(); - /** - * encoder - */ private final NettyEncoder encoder = new NettyEncoder(); - /** - * channels - */ private final ConcurrentHashMap channels = new ConcurrentHashMap(); - /** - * default executor - */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); - /** - * started flag - */ private final AtomicBoolean isStarted = new AtomicBoolean(false); - /** - * worker group - */ private final NioEventLoopGroup workerGroup; - /** - * client handler - */ private final NettyClientHandler clientHandler = new NettyClientHandler(this); - /** - * netty client config - */ private final NettyClientConfig clientConfig; - /** - * netty client init - * - * @param clientConfig client config - */ public NettyRemotingClient(final NettyClientConfig clientConfig){ this.clientConfig = clientConfig; this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { @@ -108,9 +81,6 @@ public class NettyRemotingClient { this.start(); } - /** - * netty server start - */ private void start(){ this.bootstrap @@ -129,37 +99,12 @@ public class NettyRemotingClient { encoder); } }); + // isStarted.compareAndSet(false, true); } - /** - * register processor - * - * @param commandType command type - * @param processor processor - */ - public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { - registerProcessor(commandType, processor, null); - } - - /** - * register processor - * - * @param commandType command type - * @param processor processor - * @param executor thread executor - */ - public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { - this.clientHandler.registerProcessor(commandType, processor, executor); - } - - /** - * send connect - * @param address address - * @param command command - * @throws RemotingException - */ - public void send(final Address address, final Command command) throws RemotingException { + //TODO + public void send(final Address address, final Command command, final InvokeCallback invokeCallback) throws RemotingException { final Channel channel = getChannel(address); if (channel == null) { throw new RemotingException("network error"); @@ -182,11 +127,43 @@ public class NettyRemotingClient { } } - /** - * get channel - * @param address address - * @return channel - */ + public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException { + final Channel channel = getChannel(address); + if (channel == null) { + throw new RemotingException(String.format("connect to : %s fail", address)); + } + final long opaque = command.getOpaque(); + try { + final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null); + channel.writeAndFlush(command).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + if(channelFuture.isSuccess()){ + responseFuture.setSendOk(true); + return; + } else{ + responseFuture.setSendOk(false); + responseFuture.setCause(channelFuture.cause()); + responseFuture.putResponse(null); + logger.error("send command {} to address {} failed", command, address); + } + } + }); + Command result = responseFuture.waitResponse(); + if(result == null){ + if(responseFuture.isSendOK()){ + throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause()); + } else{ + throw new RemoteException(address.toString(), responseFuture.getCause()); + } + } + return result; + } catch (Exception ex) { + String msg = String.format("send command %s to address %s error", command, address); + throw new RemotingException(msg, ex); + } + } + public Channel getChannel(Address address) { Channel channel = channels.get(address); if(channel != null && channel.isActive()){ @@ -195,12 +172,6 @@ public class NettyRemotingClient { return createChannel(address, true); } - /** - * create channel - * @param address address - * @param isSync is sync - * @return channel - */ public Channel createChannel(Address address, boolean isSync) { ChannelFuture future; try { @@ -221,17 +192,10 @@ public class NettyRemotingClient { return null; } - /** - * get default thread executor - * @return thread executor - */ public ExecutorService getDefaultExecutor() { return defaultExecutor; } - /** - * close client - */ public void close() { if(isStarted.compareAndSet(true, false)){ try { @@ -249,9 +213,6 @@ public class NettyRemotingClient { } } - /** - * close channel - */ private void closeChannels(){ for (Channel channel : this.channels.values()) { channel.close(); @@ -259,11 +220,7 @@ public class NettyRemotingClient { this.channels.clear(); } - /** - * remove channel - * @param address address - */ - public void removeChannel(Address address){ + public void closeChannel(Address address){ Channel channel = this.channels.remove(address); if(channel != null){ channel.close(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java index ee95044764..86ba79c884 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java @@ -16,7 +16,10 @@ */ package org.apache.dolphinscheduler.remote.command; +import com.sun.org.apache.regexp.internal.RE; + import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; /** * receive task log request command and content fill @@ -24,11 +27,12 @@ import java.io.Serializable; */ public class Command implements Serializable { - private static final long serialVersionUID = 1L; + private static final AtomicLong REQUEST_ID = new AtomicLong(1); public static final byte MAGIC = (byte) 0xbabe; public Command(){ + this.opaque = REQUEST_ID.getAndIncrement(); } public Command(long opaque){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java index a582221cd3..b3801aceb0 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * application name */ private String applicationName; /** * group name */ private String groupName; /** * task name */ private String taskName; /** * connect port */ private int connectorPort; /** * description info */ private String description; /** * class name */ private String className; /** * method name */ private String methodName; /** * params */ private String params; /** * shard items */ private List shardItems; public List getShardItems() { return shardItems; } public void setShardItems(List shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskRequestCommand implements Serializable { private String taskId; private String attemptId; private String applicationName; private String groupName; private String taskName; private int connectorPort; private String description; private String className; private String methodName; private String params; private List shardItems; public List getShardItems() { return shardItems; } public void setShardItems(List shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java index 0268653b5d..aeb5f7d858 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; /** * execute taks response command */ public class ExecuteTaskResponseCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * result info */ private Object result; /** * receive time */ private long receivedTime; /** * execute count */ private int executeCount; /** * execute time */ private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } /** * package response command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskResponseCommand implements Serializable { private String taskId; private String attemptId; private Object result; private long receivedTime; private int executeCount; private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(long opaque){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java index 4f32d5f699..38a8b14871 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java @@ -23,21 +23,11 @@ import io.netty.buffer.Unpooled; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; -/** - * ping machine - */ -public class Ping implements Serializable { - private static final AtomicLong ID = new AtomicLong(1); +public class Ping implements Serializable { - /** - * ping body - */ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; - /** - * request command body - */ private static byte[] EMPTY_BODY_ARRAY = new byte[0]; private static final ByteBuf PING_BUF; @@ -52,21 +42,12 @@ public class Ping implements Serializable { PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); } - /** - * ping connect - * @return result - */ public static ByteBuf pingContent(){ return PING_BUF.duplicate(); } - /** - * package ping command - * - * @return command - */ public static Command create(){ - Command command = new Command(ID.getAndIncrement()); + Command command = new Command(); command.setType(CommandType.PING); command.setBody(EMPTY_BODY_ARRAY); return command; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java index 9b064b7136..088bdd674e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java @@ -29,14 +29,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class GetLogBytesRequestCommand implements Serializable { - /** - * request id - */ - private static final AtomicLong REQUEST = new AtomicLong(1); - - /** - * log path - */ private String path; public GetLogBytesRequestCommand() { @@ -55,12 +47,11 @@ public class GetLogBytesRequestCommand implements Serializable { } /** - * package request command * - * @return command + * @return */ public Command convert2Command(){ - Command command = new Command(REQUEST.getAndIncrement()); + Command command = new Command(); command.setType(CommandType.GET_LOG_BYTES_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java index f072c479f4..339c1a7f95 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java @@ -29,24 +29,10 @@ import java.util.concurrent.atomic.AtomicLong; */ public class RollViewLogRequestCommand implements Serializable { - /** - * request id - */ - private static final AtomicLong REQUEST = new AtomicLong(1); - - /** - * log path - */ private String path; - /** - * skip line number - */ private int skipLineNum; - /** - * query log line number limit - */ private int limit; public RollViewLogRequestCommand() { @@ -82,13 +68,8 @@ public class RollViewLogRequestCommand implements Serializable { this.limit = limit; } - /** - * package request command - * - * @return command - */ public Command convert2Command(){ - Command command = new Command(REQUEST.getAndIncrement()); + Command command = new Command(); command.setType(CommandType.ROLL_VIEW_LOG_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java index 5dcefc6233..69f6009256 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java @@ -29,11 +29,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ViewLogRequestCommand implements Serializable { - /** - * request id - */ - private static final AtomicLong REQUEST = new AtomicLong(1); - private String path; public ViewLogRequestCommand() { @@ -51,13 +46,8 @@ public class ViewLogRequestCommand implements Serializable { this.path = path; } - /** - * package request command - * - * @return command - */ public Command convert2Command(){ - Command command = new Command(REQUEST.getAndIncrement()); + Command command = new Command(); command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java new file mode 100644 index 0000000000..c0d986b063 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java @@ -0,0 +1,20 @@ +package org.apache.dolphinscheduler.remote.exceptions; + +/** + * @Author: Tboy + */ +public class RemotingTimeoutException extends RemotingException{ + + public RemotingTimeoutException(String message) { + super(message); + } + + + public RemotingTimeoutException(String address, long timeoutMillis) { + this(address, timeoutMillis, null); + } + + public RemotingTimeoutException(String address, long timeoutMillis, Throwable cause) { + super(String.format("wait response on the channel %s timeout %s", address, timeoutMillis), cause); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java new file mode 100644 index 0000000000..6ad6a7cbee --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java @@ -0,0 +1,10 @@ +package org.apache.dolphinscheduler.remote.future; + +/** + * @Author: Tboy + */ +public interface InvokeCallback { + + void operationComplete(final ResponseFuture responseFuture); + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java new file mode 100644 index 0000000000..ba2c7e36b8 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java @@ -0,0 +1,101 @@ +package org.apache.dolphinscheduler.remote.future; + +import org.apache.dolphinscheduler.remote.command.Command; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @Author: Tboy + */ +public class ResponseFuture { + + private final static ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256); + + private final long opaque; + + private final long timeoutMillis; + + private final InvokeCallback invokeCallback; + + private final CountDownLatch latch = new CountDownLatch(1); + + private final long beginTimestamp = System.currentTimeMillis(); + + private volatile Command responseCommand; + + private volatile boolean sendOk = true; + + private volatile Throwable cause; + + + public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback) { + this.opaque = opaque; + this.timeoutMillis = timeoutMillis; + this.invokeCallback = invokeCallback; + FUTURE_TABLE.put(opaque, this); + } + + public Command waitResponse() throws InterruptedException { + this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + return this.responseCommand; + } + + public void putResponse(final Command responseCommand) { + this.responseCommand = responseCommand; + this.latch.countDown(); + FUTURE_TABLE.remove(opaque); + } + + public static ResponseFuture getFuture(long opaque){ + return FUTURE_TABLE.get(opaque); + } + + public boolean isTimeout() { + long diff = System.currentTimeMillis() - this.beginTimestamp; + return diff > this.timeoutMillis; + } + + public void executeInvokeCallback() { + if (invokeCallback != null) { + invokeCallback.operationComplete(this); + } + } + + public boolean isSendOK() { + return sendOk; + } + + public void setSendOk(boolean sendOk) { + this.sendOk = sendOk; + } + + public void setCause(Throwable cause) { + this.cause = cause; + } + + public Throwable getCause() { + return cause; + } + + public long getOpaque() { + return opaque; + } + + public long getTimeoutMillis() { + return timeoutMillis; + } + + public long getBeginTimestamp() { + return beginTimestamp; + } + + public Command getResponseCommand() { + return responseCommand; + } + + public InvokeCallback getInvokeCallback() { + return invokeCallback; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index 6aceb5a41b..572957f9ab 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -19,17 +19,11 @@ package org.apache.dolphinscheduler.remote.handler; import io.netty.channel.*; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; -import org.apache.dolphinscheduler.remote.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; - /** * netty client request handler */ @@ -38,119 +32,40 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); - /** - * netty remote client - */ private final NettyRemotingClient nettyRemotingClient; - /** - * client processors queue - */ - private final ConcurrentHashMap> processors = new ConcurrentHashMap(); - public NettyClientHandler(NettyRemotingClient nettyRemotingClient){ this.nettyRemotingClient = nettyRemotingClient; } - /** - * When the current channel is not active, - * the current channel has reached the end of its life cycle - * - * @param ctx channel handler context - * @throws Exception - */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); + nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); ctx.channel().close(); } - /** - * The current channel reads data from the remote - * - * @param ctx channel handler context - * @param msg message - * @throws Exception - */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - processReceived(ctx.channel(), (Command)msg); + processReceived((Command)msg); } - /** - * register processor - * - * @param commandType command type - * @param processor processor - */ - public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { - this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor()); - } - - /** - * register processor - * - * @param commandType command type - * @param processor processor - * @param executor thread executor - */ - public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { - ExecutorService executorRef = executor; - if(executorRef == null){ - executorRef = nettyRemotingClient.getDefaultExecutor(); - } - this.processors.putIfAbsent(commandType, new Pair(processor, executorRef)); - } - - /** - * process received logic - * - * @param channel channel - * @param msg message - */ - private void processReceived(final Channel channel, final Command msg) { - final CommandType commandType = msg.getType(); - final Pair pair = processors.get(commandType); - if (pair != null) { - Runnable r = new Runnable() { - @Override - public void run() { - try { - pair.getLeft().process(channel, msg); - } catch (Throwable ex) { - logger.error("process msg {} error : {}", msg, ex); - } - } - }; - try { - pair.getRight().submit(r); - } catch (RejectedExecutionException e) { - logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel)); - } - } else { - logger.warn("commandType {} not support", commandType); + private void processReceived(final Command responseCommand) { + ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque()); + if(future != null){ + future.putResponse(responseCommand); + future.executeInvokeCallback(); + } else{ + logger.warn("receive response {}, but not matched any request ", responseCommand); } } - /** - * caught exception - * - * @param ctx channel handler context - * @param cause cause - * @throws Exception - */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("exceptionCaught : {}", cause); - nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); + nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); ctx.channel().close(); } - /** - * channel write changed - * @param ctx channel handler context - * @throws Exception - */ @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java index 6f0a802af6..1737916903 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java @@ -54,24 +54,14 @@ public class NettyRemotingClientTest { }); server.start(); // - CountDownLatch latch = new CountDownLatch(1); - AtomicLong opaque = new AtomicLong(1); final NettyClientConfig clientConfig = new NettyClientConfig(); NettyRemotingClient client = new NettyRemotingClient(clientConfig); - client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() { - @Override - public void process(Channel channel, Command command) { - opaque.set(command.getOpaque()); - latch.countDown(); - } - }); Command commandPing = Ping.create(); try { - client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing); - latch.await(); + Command response = client.sendSync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000); + Assert.assertEquals(commandPing.getOpaque(), response.getOpaque()); } catch (Exception e) { e.printStackTrace(); } - Assert.assertEquals(opaque.get(), commandPing.getOpaque()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index 90711e1d14..e0c00c55d9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -378,8 +378,8 @@ public class ProcessUtils { LogClientService logClient = null; String log = null; try { - logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT); - log = logClient.viewLog(taskInstance.getLogPath()); + logClient = new LogClientService(); + log = logClient.viewLog(taskInstance.getHost(), Constants.RPC_PORT, taskInstance.getLogPath()); } finally { if(logClient != null){ logClient.close(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java index aa6999ef0d..3ca2d29782 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java @@ -16,13 +16,10 @@ */ package org.apache.dolphinscheduler.service.log; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.log.*; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Address; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.slf4j.Logger; @@ -32,7 +29,7 @@ import org.slf4j.LoggerFactory; /** * log client */ -public class LogClientService implements NettyRequestProcessor { +public class LogClientService { private static final Logger logger = LoggerFactory.getLogger(LogClientService.class); @@ -40,8 +37,6 @@ public class LogClientService implements NettyRequestProcessor { private final NettyRemotingClient client; - private final Address address; - /** * request time out */ @@ -49,18 +44,11 @@ public class LogClientService implements NettyRequestProcessor { /** * construct client - * @param host host - * @param port port */ - public LogClientService(String host, int port) { - this.address = new Address(host, port); + public LogClientService() { this.clientConfig = new NettyClientConfig(); - this.clientConfig.setWorkerThreads(1); + this.clientConfig.setWorkerThreads(4); this.client = new NettyRemotingClient(clientConfig); - this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this); - this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this); - this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this); - } /** @@ -73,94 +61,87 @@ public class LogClientService implements NettyRequestProcessor { /** * roll view log + * @param host host + * @param port port * @param path path * @param skipLineNum skip line number * @param limit limit * @return log content */ - public String rollViewLog(String path,int skipLineNum,int limit) { - logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit); + public String rollViewLog(String host, int port, String path,int skipLineNum,int limit) { + logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit); RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit); String result = ""; + final Address address = new Address(host, port); try { Command command = request.convert2Command(); - this.client.send(address, command); - LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout); - result = ((String)promise.getResult()); + Command response = this.client.sendSync(address, command, logRequestTimeout); + if(response != null){ + RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize( + command.getBody(), RollViewLogResponseCommand.class); + return rollReviewLog.getMsg(); + } } catch (Exception e) { logger.error("roll view log error", e); + } finally { + this.client.closeChannel(address); } return result; } /** * view log + * @param host host + * @param port port * @param path path * @return log content */ - public String viewLog(String path) { + public String viewLog(String host, int port, String path) { logger.info("view log path {}", path); ViewLogRequestCommand request = new ViewLogRequestCommand(path); String result = ""; + final Address address = new Address(host, port); try { Command command = request.convert2Command(); - this.client.send(address, command); - LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout); - result = ((String)promise.getResult()); + Command response = this.client.sendSync(address, command, logRequestTimeout); + if(response != null){ + ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize( + response.getBody(), ViewLogResponseCommand.class); + return viewLog.getMsg(); + } } catch (Exception e) { logger.error("view log error", e); + } finally { + this.client.closeChannel(address); } return result; } /** * get log size + * @param host host + * @param port port * @param path log path * @return log content bytes */ - public byte[] getLogBytes(String path) { + public byte[] getLogBytes(String host, int port, String path) { logger.info("log path {}", path); GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path); byte[] result = null; + final Address address = new Address(host, port); try { Command command = request.convert2Command(); - this.client.send(address, command); - LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout); - result = (byte[])promise.getResult(); + Command response = this.client.sendSync(address, command, logRequestTimeout); + if(response != null){ + GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize( + response.getBody(), GetLogBytesResponseCommand.class); + return getLog.getData(); + } } catch (Exception e) { logger.error("get log size error", e); + } finally { + this.client.closeChannel(address); } return result; } - - @Override - public void process(Channel channel, Command command) { - logger.info("received log response : {}", command); - switch (command.getType()){ - case ROLL_VIEW_LOG_RESPONSE: - RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize( - command.getBody(), RollViewLogResponseCommand.class); - LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg()); - break; - case VIEW_WHOLE_LOG_RESPONSE: - ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize( - command.getBody(), ViewLogResponseCommand.class); - LogPromise.notify(command.getOpaque(), viewLog.getMsg()); - break; - case GET_LOG_BYTES_RESPONSE: - GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize( - command.getBody(), GetLogBytesResponseCommand.class); - LogPromise.notify(command.getOpaque(), getLog.getData()); - break; - default: - throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType())); - } - } - - public static void main(String[] args) throws Exception{ - LogClientService logClient = new LogClientService("192.168.220.247", 50051); - byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log"); - System.out.println(new String(logBytes)); - } - } \ No newline at end of file