diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index bff54b6c21..1f65208240 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import javax.annotation.PreDestroy;
+
/**
* log service
*/
@@ -39,6 +41,17 @@ public class LoggerService {
@Autowired
private ProcessService processService;
+ private final LogClientService logClient;
+
+ public LoggerService(){
+ logClient = new LogClientService();
+ }
+
+ @PreDestroy
+ public void close(){
+ logClient.close();
+ }
+
/**
* view log
*
@@ -64,18 +77,9 @@ public class LoggerService {
Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
- LogClientService logClient = null;
- try {
- logClient = new LogClientService(host, Constants.RPC_PORT);
- String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
- result.setData(log);
- logger.info(log);
- } finally {
- if(logClient != null){
- logClient.close();
- }
- }
-
+ String log = logClient.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(),skipLineNum,limit);
+ result.setData(log);
+ logger.info(log);
return result;
}
@@ -90,16 +94,7 @@ public class LoggerService {
if (taskInstance == null){
throw new RuntimeException("task instance is null");
}
-
String host = taskInstance.getHost();
- LogClientService logClient = null;
- try {
- logClient = new LogClientService(host, Constants.RPC_PORT);
- return logClient.getLogBytes(taskInstance.getLogPath());
- } finally {
- if(logClient != null){
- logClient.close();
- }
- }
+ return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
}
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index df0c13ad38..357fd6d19d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -25,21 +25,22 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingTooMuchRequestException;
+import org.apache.dolphinscheduler.remote.future.InvokeCallback;
+import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
+import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
-import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
-import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,7 +52,7 @@ public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
/**
- * bootstrap
+ * client bootstrap
*/
private final Bootstrap bootstrap = new Bootstrap();
@@ -61,14 +62,9 @@ public class NettyRemotingClient {
private final NettyEncoder encoder = new NettyEncoder();
/**
- * channels
+ * channels
*/
- private final ConcurrentHashMap
channels = new ConcurrentHashMap();
-
- /**
- * default executor
- */
- private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
+ private final ConcurrentHashMap channels = new ConcurrentHashMap(128);
/**
* started flag
@@ -81,18 +77,27 @@ public class NettyRemotingClient {
private final NioEventLoopGroup workerGroup;
/**
- * client handler
+ * client config
*/
- private final NettyClientHandler clientHandler = new NettyClientHandler(this);
+ private final NettyClientConfig clientConfig;
/**
- * netty client config
+ * saync semaphore
*/
- private final NettyClientConfig clientConfig;
+ private final Semaphore asyncSemaphore = new Semaphore(200, true);
/**
- * netty client init
- *
+ * callback thread executor
+ */
+ private final ExecutorService callbackExecutor;
+
+ /**
+ * client handler
+ */
+ private final NettyClientHandler clientHandler;
+
+ /**
+ * client init
* @param clientConfig client config
*/
public NettyRemotingClient(final NettyClientConfig clientConfig){
@@ -105,11 +110,16 @@ public class NettyRemotingClient {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
+ this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
+ new CallerThreadExecutePolicy());
+ this.clientHandler = new NettyClientHandler(this, callbackExecutor);
+
this.start();
}
/**
- * netty server start
+ * start
*/
private void start(){
@@ -129,63 +139,125 @@ public class NettyRemotingClient {
encoder);
}
});
+ //
isStarted.compareAndSet(false, true);
}
/**
- * register processor
- *
- * @param commandType command type
- * @param processor processor
+ * async send
+ * @param address address
+ * @param command command
+ * @param timeoutMillis timeoutMillis
+ * @param invokeCallback callback function
+ * @throws InterruptedException
+ * @throws RemotingException
*/
- public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
- registerProcessor(commandType, processor, null);
- }
+ public void sendAsync(final Address address, final Command command,
+ final long timeoutMillis,
+ final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
+ final Channel channel = getChannel(address);
+ if (channel == null) {
+ throw new RemotingException("network error");
+ }
+ /**
+ * request unique identification
+ */
+ final long opaque = command.getOpaque();
+ /**
+ * control concurrency number
+ */
+ boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+ if(acquired){
+ final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
- /**
- * register processor
- *
- * @param commandType command type
- * @param processor processor
- * @param executor thread executor
- */
- public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
- this.clientHandler.registerProcessor(commandType, processor, executor);
+ /**
+ * response future
+ */
+ final ResponseFuture responseFuture = new ResponseFuture(opaque,
+ timeoutMillis,
+ invokeCallback,
+ releaseSemaphore);
+ try {
+ channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(future.isSuccess()){
+ responseFuture.setSendOk(true);
+ return;
+ } else {
+ responseFuture.setSendOk(false);
+ }
+ responseFuture.setCause(future.cause());
+ responseFuture.putResponse(null);
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Throwable ex){
+ logger.error("execute callback error", ex);
+ } finally{
+ responseFuture.release();
+ }
+ }
+ });
+ } catch (Throwable ex){
+ responseFuture.release();
+ throw new RemotingException(String.format("send command to address: %s failed", address), ex);
+ }
+ } else{
+ String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
+ timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
+ throw new RemotingTooMuchRequestException(message);
+ }
}
/**
- * send connect
+ * sync send
* @param address address
* @param command command
+ * @param timeoutMillis timeoutMillis
+ * @return command
+ * @throws InterruptedException
* @throws RemotingException
*/
- public void send(final Address address, final Command command) throws RemotingException {
+ public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
- throw new RemotingException("network error");
+ throw new RemotingException(String.format("connect to : %s fail", address));
}
- try {
- channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if(future.isSuccess()){
- logger.info("sent command {} to {}", command, address);
- } else{
- logger.error("send command {} to {} failed, error {}", command, address, future.cause());
- }
+ final long opaque = command.getOpaque();
+ final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
+ channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(future.isSuccess()){
+ responseFuture.setSendOk(true);
+ return;
+ } else {
+ responseFuture.setSendOk(false);
}
- });
- } catch (Exception ex) {
- String msg = String.format("send command %s to address %s encounter error", command, address);
- throw new RemotingException(msg, ex);
+ responseFuture.setCause(future.cause());
+ responseFuture.putResponse(null);
+ logger.error("send command {} to address {} failed", command, address);
+ }
+ });
+ /**
+ * sync wait for result
+ */
+ Command result = responseFuture.waitResponse();
+ if(result == null){
+ if(responseFuture.isSendOK()){
+ throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
+ } else{
+ throw new RemotingException(address.toString(), responseFuture.getCause());
+ }
}
+ return result;
}
/**
* get channel
- * @param address address
- * @return channel
+ * @param address
+ * @return
*/
public Channel getChannel(Address address) {
Channel channel = channels.get(address);
@@ -196,9 +268,9 @@ public class NettyRemotingClient {
}
/**
- * create channel
+ * create channel
* @param address address
- * @param isSync is sync
+ * @param isSync sync flag
* @return channel
*/
public Channel createChannel(Address address, boolean isSync) {
@@ -222,15 +294,7 @@ public class NettyRemotingClient {
}
/**
- * get default thread executor
- * @return thread executor
- */
- public ExecutorService getDefaultExecutor() {
- return defaultExecutor;
- }
-
- /**
- * close client
+ * close
*/
public void close() {
if(isStarted.compareAndSet(true, false)){
@@ -239,8 +303,8 @@ public class NettyRemotingClient {
if(workerGroup != null){
this.workerGroup.shutdownGracefully();
}
- if(defaultExecutor != null){
- defaultExecutor.shutdown();
+ if(callbackExecutor != null){
+ this.callbackExecutor.shutdownNow();
}
} catch (Exception ex) {
logger.error("netty client close exception", ex);
@@ -250,7 +314,7 @@ public class NettyRemotingClient {
}
/**
- * close channel
+ * close channels
*/
private void closeChannels(){
for (Channel channel : this.channels.values()) {
@@ -260,10 +324,10 @@ public class NettyRemotingClient {
}
/**
- * remove channel
+ * close channel
* @param address address
*/
- public void removeChannel(Address address){
+ public void closeChannel(Address address){
Channel channel = this.channels.remove(address);
if(channel != null){
channel.close();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index c69bf09540..29b2317633 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -49,7 +49,7 @@ public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
/**
- * server bootstart
+ * server bootstrap
*/
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
index ee95044764..86ba79c884 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
@@ -16,7 +16,10 @@
*/
package org.apache.dolphinscheduler.remote.command;
+import com.sun.org.apache.regexp.internal.RE;
+
import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
/**
* receive task log request command and content fill
@@ -24,11 +27,12 @@ import java.io.Serializable;
*/
public class Command implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final AtomicLong REQUEST_ID = new AtomicLong(1);
public static final byte MAGIC = (byte) 0xbabe;
public Command(){
+ this.opaque = REQUEST_ID.getAndIncrement();
}
public Command(long opaque){
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
index a582221cd3..beec055403 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* execute task request command
*/
public class ExecuteTaskRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
/**
* task id
*/
private String taskId;
/**
* attempt id
*/
private String attemptId;
/**
* application name
*/
private String applicationName;
/**
* group name
*/
private String groupName;
/**
* task name
*/
private String taskName;
/**
* connect port
*/
private int connectorPort;
/**
* description info
*/
private String description;
/**
* class name
*/
private String className;
/**
* method name
*/
private String methodName;
/**
* params
*/
private String params;
/**
* shard items
*/
private List shardItems;
public List getShardItems() {
return shardItems;
}
public void setShardItems(List shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* execute task request command
*/
public class ExecuteTaskRequestCommand implements Serializable {
/**
* task id
*/
private String taskId;
/**
* attempt id
*/
private String attemptId;
/**
* application name
*/
private String applicationName;
/**
* group name
*/
private String groupName;
/**
* task name
*/
private String taskName;
/**
* connector port
*/
private int connectorPort;
/**
* description info
*/
private String description;
/**
* class name
*/
private String className;
/**
* method name
*/
private String methodName;
/**
* parameters
*/
private String params;
/**
* shard itemds
*/
private List shardItems;
public List getShardItems() {
return shardItems;
}
public void setShardItems(List shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
index 0268653b5d..7e35fa6e75 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* execute taks response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
/**
* task id
*/
private String taskId;
/**
* attempt id
*/
private String attemptId;
/**
* result info
*/
private Object result;
/**
* receive time
*/
private long receivedTime;
/**
* execute count
*/
private int executeCount;
/**
* execute time
*/
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
/**
* package response command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* execute task response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
/**
* task id
*/
private String taskId;
/**
* attempt id
*/
private String attemptId;
/**
* return result
*/
private Object result;
/**
* received time
*/
private long receivedTime;
/**
* execute count
*/
private int executeCount;
/**
* execute time
*/
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(long opaque){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
index 4f32d5f699..c50413e98a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
@@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class Ping implements Serializable {
- private static final AtomicLong ID = new AtomicLong(1);
-
/**
* ping body
*/
@@ -53,7 +51,7 @@ public class Ping implements Serializable {
}
/**
- * ping connect
+ * ping content
* @return result
*/
public static ByteBuf pingContent(){
@@ -61,12 +59,12 @@ public class Ping implements Serializable {
}
/**
- * package ping command
+ * create ping command
*
* @return command
*/
public static Command create(){
- Command command = new Command(ID.getAndIncrement());
+ Command command = new Command();
command.setType(CommandType.PING);
command.setBody(EMPTY_BODY_ARRAY);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
index 9b064b7136..4cc32ed42a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
@@ -29,11 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class GetLogBytesRequestCommand implements Serializable {
- /**
- * request id
- */
- private static final AtomicLong REQUEST = new AtomicLong(1);
-
/**
* log path
*/
@@ -60,7 +55,7 @@ public class GetLogBytesRequestCommand implements Serializable {
* @return command
*/
public Command convert2Command(){
- Command command = new Command(REQUEST.getAndIncrement());
+ Command command = new Command();
command.setType(CommandType.GET_LOG_BYTES_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
index f072c479f4..621d35a804 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
@@ -29,11 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class RollViewLogRequestCommand implements Serializable {
- /**
- * request id
- */
- private static final AtomicLong REQUEST = new AtomicLong(1);
-
/**
* log path
*/
@@ -45,7 +40,7 @@ public class RollViewLogRequestCommand implements Serializable {
private int skipLineNum;
/**
- * query log line number limit
+ * query line number
*/
private int limit;
@@ -83,12 +78,12 @@ public class RollViewLogRequestCommand implements Serializable {
}
/**
- * package request command
+ * package request command
*
* @return command
*/
public Command convert2Command(){
- Command command = new Command(REQUEST.getAndIncrement());
+ Command command = new Command();
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
index 5dcefc6233..8835348ee3 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
@@ -30,10 +30,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class ViewLogRequestCommand implements Serializable {
/**
- * request id
+ * log path
*/
- private static final AtomicLong REQUEST = new AtomicLong(1);
-
private String path;
public ViewLogRequestCommand() {
@@ -57,7 +55,7 @@ public class ViewLogRequestCommand implements Serializable {
* @return command
*/
public Command convert2Command(){
- Command command = new Command(REQUEST.getAndIncrement());
+ Command command = new Command();
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java
new file mode 100644
index 0000000000..3d91ba57f6
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.exceptions;
+
+
+/**
+ * timeout exception
+ */
+public class RemotingTimeoutException extends RemotingException{
+
+ public RemotingTimeoutException(String message) {
+ super(message);
+ }
+
+
+ public RemotingTimeoutException(String address, long timeoutMillis) {
+ this(address, timeoutMillis, null);
+ }
+
+ public RemotingTimeoutException(String address, long timeoutMillis, Throwable cause) {
+ super(String.format("wait response on the channel %s timeout %s", address, timeoutMillis), cause);
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java
new file mode 100644
index 0000000000..82cc3f4dbf
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.exceptions;
+
+/**
+ * too much request exception
+ */
+public class RemotingTooMuchRequestException extends RemotingException{
+
+ public RemotingTooMuchRequestException(String message) {
+ super(message);
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
new file mode 100644
index 0000000000..84cdae867b
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.future;
+
+/**
+ * invoke callback
+ */
+public interface InvokeCallback {
+
+ /**
+ * operation
+ *
+ * @param responseFuture responseFuture
+ */
+ void operationComplete(final ResponseFuture responseFuture);
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ReleaseSemaphore.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ReleaseSemaphore.java
new file mode 100644
index 0000000000..95a04b1f1a
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ReleaseSemaphore.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.future;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * release semaphore
+ */
+public class ReleaseSemaphore {
+
+ private final Semaphore semaphore;
+
+ private final AtomicBoolean released;
+
+ public ReleaseSemaphore(Semaphore semaphore){
+ this.semaphore = semaphore;
+ this.released = new AtomicBoolean(false);
+ }
+
+ public void release(){
+ if(this.released.compareAndSet(false, true)){
+ this.semaphore.release();
+ }
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
new file mode 100644
index 0000000000..caff34236e
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.future;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+
+import java.util.concurrent.*;
+
+/**
+ * response future
+ */
+public class ResponseFuture {
+
+ private final static ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256);
+
+ /**
+ * request unique identification
+ */
+ private final long opaque;
+
+ /**
+ * timeout
+ */
+ private final long timeoutMillis;
+
+ /**
+ * invokeCallback function
+ */
+ private final InvokeCallback invokeCallback;
+
+ /**
+ * releaseSemaphore
+ */
+ private final ReleaseSemaphore releaseSemaphore;
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private final long beginTimestamp = System.currentTimeMillis();
+
+ /**
+ * response command
+ */
+ private volatile Command responseCommand;
+
+ private volatile boolean sendOk = true;
+
+ private volatile Throwable cause;
+
+ public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback, ReleaseSemaphore releaseSemaphore) {
+ this.opaque = opaque;
+ this.timeoutMillis = timeoutMillis;
+ this.invokeCallback = invokeCallback;
+ this.releaseSemaphore = releaseSemaphore;
+ FUTURE_TABLE.put(opaque, this);
+ }
+
+ /**
+ * wait for response
+ *
+ * @return command
+ * @throws InterruptedException
+ */
+ public Command waitResponse() throws InterruptedException {
+ this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ return this.responseCommand;
+ }
+
+ /**
+ * put response
+ *
+ * @param responseCommand responseCommand
+ */
+ public void putResponse(final Command responseCommand) {
+ this.responseCommand = responseCommand;
+ this.latch.countDown();
+ FUTURE_TABLE.remove(opaque);
+ }
+
+ public static ResponseFuture getFuture(long opaque){
+ return FUTURE_TABLE.get(opaque);
+ }
+
+ /**
+ * whether timeout
+ * @return timeout
+ */
+ public boolean isTimeout() {
+ long diff = System.currentTimeMillis() - this.beginTimestamp;
+ return diff > this.timeoutMillis;
+ }
+
+ /**
+ * execute invoke callback
+ */
+ public void executeInvokeCallback() {
+ if (invokeCallback != null) {
+ invokeCallback.operationComplete(this);
+ }
+ }
+
+ public boolean isSendOK() {
+ return sendOk;
+ }
+
+ public void setSendOk(boolean sendOk) {
+ this.sendOk = sendOk;
+ }
+
+ public void setCause(Throwable cause) {
+ this.cause = cause;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+ public long getOpaque() {
+ return opaque;
+ }
+
+ public long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+ public long getBeginTimestamp() {
+ return beginTimestamp;
+ }
+
+ public Command getResponseCommand() {
+ return responseCommand;
+ }
+
+ public void setResponseCommand(Command responseCommand) {
+ this.responseCommand = responseCommand;
+ }
+
+ public InvokeCallback getInvokeCallback() {
+ return invokeCallback;
+ }
+
+ /**
+ * release
+ */
+ public void release() {
+ if(this.releaseSemaphore != null){
+ this.releaseSemaphore.release();
+ }
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
index 6aceb5a41b..d5d0d4df83 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -19,16 +19,12 @@ package org.apache.dolphinscheduler.remote.handler;
import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
-import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
/**
* netty client request handler
@@ -39,17 +35,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
/**
- * netty remote client
+ * netty client
*/
private final NettyRemotingClient nettyRemotingClient;
/**
- * client processors queue
+ * callback thread executor
*/
- private final ConcurrentHashMap> processors = new ConcurrentHashMap();
+ private final ExecutorService callbackExecutor;
- public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
+ public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
this.nettyRemotingClient = nettyRemotingClient;
+ this.callbackExecutor = callbackExecutor;
}
/**
@@ -61,7 +58,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+ nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}
@@ -74,80 +71,50 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- processReceived(ctx.channel(), (Command)msg);
- }
-
- /**
- * register processor
- *
- * @param commandType command type
- * @param processor processor
- */
- public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
- this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
- }
-
- /**
- * register processor
- *
- * @param commandType command type
- * @param processor processor
- * @param executor thread executor
- */
- public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
- ExecutorService executorRef = executor;
- if(executorRef == null){
- executorRef = nettyRemotingClient.getDefaultExecutor();
- }
- this.processors.putIfAbsent(commandType, new Pair(processor, executorRef));
+ processReceived((Command)msg);
}
/**
* process received logic
*
- * @param channel channel
- * @param msg message
+ * @param responseCommand responseCommand
*/
- private void processReceived(final Channel channel, final Command msg) {
- final CommandType commandType = msg.getType();
- final Pair pair = processors.get(commandType);
- if (pair != null) {
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- pair.getLeft().process(channel, msg);
- } catch (Throwable ex) {
- logger.error("process msg {} error : {}", msg, ex);
+ private void processReceived(final Command responseCommand) {
+ ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
+ if(future != null){
+ future.setResponseCommand(responseCommand);
+ future.release();
+ if(future.getInvokeCallback() != null){
+ this.callbackExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ future.executeInvokeCallback();
}
- }
- };
- try {
- pair.getRight().submit(r);
- } catch (RejectedExecutionException e) {
- logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
+ });
+ } else{
+ future.putResponse(responseCommand);
}
- } else {
- logger.warn("commandType {} not support", commandType);
+ } else{
+ logger.warn("receive response {}, but not matched any request ", responseCommand);
}
}
/**
* caught exception
- *
* @param ctx channel handler context
- * @param cause cause
+ * @param cause cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause);
- nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+ nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}
/**
* channel write changed
+ *
* @param ctx channel handler context
* @throws Exception
*/
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/CallerThreadExecutePolicy.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/CallerThreadExecutePolicy.java
new file mode 100644
index 0000000000..048ea86acb
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/CallerThreadExecutePolicy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * caller thread execute
+ */
+public class CallerThreadExecutePolicy implements RejectedExecutionHandler {
+
+ private final Logger logger = LoggerFactory.getLogger(CallerThreadExecutePolicy.class);
+
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ logger.warn("queue is full, trigger caller thread execute");
+ r.run();
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java
new file mode 100644
index 0000000000..2f0d05ebd4
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * thread factory
+ */
+public class NamedThreadFactory implements ThreadFactory {
+
+ private final AtomicInteger increment = new AtomicInteger(1);
+
+ /**
+ * name
+ */
+ private final String name;
+
+ /**
+ * count
+ */
+ private final int count;
+
+ public NamedThreadFactory(String name){
+ this(name, 0);
+ }
+
+ public NamedThreadFactory(String name, int count){
+ this.name = name;
+ this.count = count;
+ }
+
+ /**
+ * create thread
+ * @param r runnable
+ * @return thread
+ */
+ @Override
+ public Thread newThread(Runnable r) {
+ final String threadName = count > 0 ? String.format(name + "_%d_%d", count, increment.getAndIncrement())
+ : String.format(name + "_%d", increment.getAndIncrement());
+ Thread t = new Thread(r, threadName);
+ t.setDaemon(true);
+ return t;
+ }
+}
diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
index 6f0a802af6..b6f8e2a8de 100644
--- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
+++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
@@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.remote.command.Ping;
import org.apache.dolphinscheduler.remote.command.Pong;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.future.InvokeCallback;
+import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.junit.Assert;
@@ -39,10 +41,10 @@ public class NettyRemotingClientTest {
/**
- * test ping
+ * test sned sync
*/
@Test
- public void testSend(){
+ public void testSendSync(){
NettyServerConfig serverConfig = new NettyServerConfig();
NettyRemotingServer server = new NettyRemotingServer(serverConfig);
@@ -52,26 +54,54 @@ public class NettyRemotingClientTest {
channel.writeAndFlush(Pong.create(command.getOpaque()));
}
});
+
+
server.start();
//
- CountDownLatch latch = new CountDownLatch(1);
- AtomicLong opaque = new AtomicLong(1);
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
- client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() {
+ Command commandPing = Ping.create();
+ try {
+ Command response = client.sendSync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
+ Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * test sned async
+ */
+ @Test
+ public void testSendAsync(){
+ NettyServerConfig serverConfig = new NettyServerConfig();
+
+ NettyRemotingServer server = new NettyRemotingServer(serverConfig);
+ server.registerProcessor(CommandType.PING, new NettyRequestProcessor() {
@Override
public void process(Channel channel, Command command) {
- opaque.set(command.getOpaque());
- latch.countDown();
+ channel.writeAndFlush(Pong.create(command.getOpaque()));
}
});
+ server.start();
+ //
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ NettyRemotingClient client = new NettyRemotingClient(clientConfig);
+ CountDownLatch latch = new CountDownLatch(1);
Command commandPing = Ping.create();
try {
- client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing);
+ final AtomicLong opaque = new AtomicLong(0);
+ client.sendAsync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ opaque.set(responseFuture.getOpaque());
+ latch.countDown();
+ }
+ });
latch.await();
+ Assert.assertEquals(commandPing.getOpaque(), opaque.get());
} catch (Exception e) {
e.printStackTrace();
}
- Assert.assertEquals(opaque.get(), commandPing.getOpaque());
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 90711e1d14..e0c00c55d9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -378,8 +378,8 @@ public class ProcessUtils {
LogClientService logClient = null;
String log = null;
try {
- logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT);
- log = logClient.viewLog(taskInstance.getLogPath());
+ logClient = new LogClientService();
+ log = logClient.viewLog(taskInstance.getHost(), Constants.RPC_PORT, taskInstance.getLogPath());
} finally {
if(logClient != null){
logClient.close();
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index aa6999ef0d..5daf535625 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -16,13 +16,11 @@
*/
package org.apache.dolphinscheduler.service.log;
-import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.slf4j.Logger;
@@ -32,7 +30,7 @@ import org.slf4j.LoggerFactory;
/**
* log client
*/
-public class LogClientService implements NettyRequestProcessor {
+public class LogClientService {
private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
@@ -40,8 +38,6 @@ public class LogClientService implements NettyRequestProcessor {
private final NettyRemotingClient client;
- private final Address address;
-
/**
* request time out
*/
@@ -49,18 +45,11 @@ public class LogClientService implements NettyRequestProcessor {
/**
* construct client
- * @param host host
- * @param port port
*/
- public LogClientService(String host, int port) {
- this.address = new Address(host, port);
+ public LogClientService() {
this.clientConfig = new NettyClientConfig();
- this.clientConfig.setWorkerThreads(1);
+ this.clientConfig.setWorkerThreads(4);
this.client = new NettyRemotingClient(clientConfig);
- this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
- this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
- this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
-
}
/**
@@ -73,94 +62,87 @@ public class LogClientService implements NettyRequestProcessor {
/**
* roll view log
+ * @param host host
+ * @param port port
* @param path path
* @param skipLineNum skip line number
* @param limit limit
* @return log content
*/
- public String rollViewLog(String path,int skipLineNum,int limit) {
- logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit);
+ public String rollViewLog(String host, int port, String path,int skipLineNum,int limit) {
+ logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
+ final Address address = new Address(host, port);
try {
Command command = request.convert2Command();
- this.client.send(address, command);
- LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
- result = ((String)promise.getResult());
+ Command response = this.client.sendSync(address, command, logRequestTimeout);
+ if(response != null){
+ RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
+ response.getBody(), RollViewLogResponseCommand.class);
+ return rollReviewLog.getMsg();
+ }
} catch (Exception e) {
logger.error("roll view log error", e);
+ } finally {
+ this.client.closeChannel(address);
}
return result;
}
/**
* view log
+ * @param host host
+ * @param port port
* @param path path
* @return log content
*/
- public String viewLog(String path) {
+ public String viewLog(String host, int port, String path) {
logger.info("view log path {}", path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = "";
+ final Address address = new Address(host, port);
try {
Command command = request.convert2Command();
- this.client.send(address, command);
- LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
- result = ((String)promise.getResult());
+ Command response = this.client.sendSync(address, command, logRequestTimeout);
+ if(response != null){
+ ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
+ response.getBody(), ViewLogResponseCommand.class);
+ return viewLog.getMsg();
+ }
} catch (Exception e) {
logger.error("view log error", e);
+ } finally {
+ this.client.closeChannel(address);
}
return result;
}
/**
* get log size
+ * @param host host
+ * @param port port
* @param path log path
* @return log content bytes
*/
- public byte[] getLogBytes(String path) {
+ public byte[] getLogBytes(String host, int port, String path) {
logger.info("log path {}", path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
byte[] result = null;
+ final Address address = new Address(host, port);
try {
Command command = request.convert2Command();
- this.client.send(address, command);
- LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
- result = (byte[])promise.getResult();
+ Command response = this.client.sendSync(address, command, logRequestTimeout);
+ if(response != null){
+ GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
+ response.getBody(), GetLogBytesResponseCommand.class);
+ return getLog.getData();
+ }
} catch (Exception e) {
logger.error("get log size error", e);
+ } finally {
+ this.client.closeChannel(address);
}
return result;
}
-
- @Override
- public void process(Channel channel, Command command) {
- logger.info("received log response : {}", command);
- switch (command.getType()){
- case ROLL_VIEW_LOG_RESPONSE:
- RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
- command.getBody(), RollViewLogResponseCommand.class);
- LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
- break;
- case VIEW_WHOLE_LOG_RESPONSE:
- ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
- command.getBody(), ViewLogResponseCommand.class);
- LogPromise.notify(command.getOpaque(), viewLog.getMsg());
- break;
- case GET_LOG_BYTES_RESPONSE:
- GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
- command.getBody(), GetLogBytesResponseCommand.class);
- LogPromise.notify(command.getOpaque(), getLog.getData());
- break;
- default:
- throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
- }
- }
-
- public static void main(String[] args) throws Exception{
- LogClientService logClient = new LogClientService("192.168.220.247", 50051);
- byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log");
- System.out.println(new String(logBytes));
- }
-
}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
index 8920b8a527..98ee3fdbbf 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
@@ -28,14 +28,29 @@ public class LogPromise {
private static final ConcurrentHashMap PROMISES = new ConcurrentHashMap<>();
+ /**
+ * request unique identification
+ */
private long opaque;
+ /**
+ * start timemillis
+ */
private final long start;
+ /**
+ * timeout
+ */
private final long timeout;
+ /**
+ * latch
+ */
private final CountDownLatch latch;
+ /**
+ * result
+ */
private Object result;
public LogPromise(long opaque, long timeout){
@@ -59,15 +74,28 @@ public class LogPromise {
}
}
+ /**
+ * countdown
+ *
+ * @param result result
+ */
private void doCountDown(Object result){
this.result = result;
this.latch.countDown();
}
+ /**
+ * whether timeout
+ * @return timeout
+ */
public boolean isTimeout(){
return System.currentTimeMillis() - start > timeout;
}
+ /**
+ * get result
+ * @return
+ */
public Object getResult(){
try {
latch.await(timeout, TimeUnit.MILLISECONDS);