diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index e07cfd6404..357fd6d19d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -51,24 +51,55 @@ public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
+ /**
+ * client bootstrap
+ */
private final Bootstrap bootstrap = new Bootstrap();
+ /**
+ * encoder
+ */
private final NettyEncoder encoder = new NettyEncoder();
+ /**
+ * channels
+ */
private final ConcurrentHashMap
channels = new ConcurrentHashMap(128);
+ /**
+ * started flag
+ */
private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ /**
+ * worker group
+ */
private final NioEventLoopGroup workerGroup;
+ /**
+ * client config
+ */
private final NettyClientConfig clientConfig;
+ /**
+ * saync semaphore
+ */
private final Semaphore asyncSemaphore = new Semaphore(200, true);
+ /**
+ * callback thread executor
+ */
private final ExecutorService callbackExecutor;
+ /**
+ * client handler
+ */
private final NettyClientHandler clientHandler;
+ /**
+ * client init
+ * @param clientConfig client config
+ */
public NettyRemotingClient(final NettyClientConfig clientConfig){
this.clientConfig = clientConfig;
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
@@ -80,12 +111,16 @@ public class NettyRemotingClient {
}
});
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
- new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), new CallerThreadExecutePolicy());
+ new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
+ new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.start();
}
+ /**
+ * start
+ */
private void start(){
this.bootstrap
@@ -108,16 +143,40 @@ public class NettyRemotingClient {
isStarted.compareAndSet(false, true);
}
- public void sendAsync(final Address address, final Command command, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
+ /**
+ * async send
+ * @param address address
+ * @param command command
+ * @param timeoutMillis timeoutMillis
+ * @param invokeCallback callback function
+ * @throws InterruptedException
+ * @throws RemotingException
+ */
+ public void sendAsync(final Address address, final Command command,
+ final long timeoutMillis,
+ final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
throw new RemotingException("network error");
}
+ /**
+ * request unique identification
+ */
final long opaque = command.getOpaque();
+ /**
+ * control concurrency number
+ */
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if(acquired){
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
- final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, releaseSemaphore);
+
+ /**
+ * response future
+ */
+ final ResponseFuture responseFuture = new ResponseFuture(opaque,
+ timeoutMillis,
+ invokeCallback,
+ releaseSemaphore);
try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
@@ -151,6 +210,15 @@ public class NettyRemotingClient {
}
}
+ /**
+ * sync send
+ * @param address address
+ * @param command command
+ * @param timeoutMillis timeoutMillis
+ * @return command
+ * @throws InterruptedException
+ * @throws RemotingException
+ */
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
@@ -172,6 +240,9 @@ public class NettyRemotingClient {
logger.error("send command {} to address {} failed", command, address);
}
});
+ /**
+ * sync wait for result
+ */
Command result = responseFuture.waitResponse();
if(result == null){
if(responseFuture.isSendOK()){
@@ -183,6 +254,11 @@ public class NettyRemotingClient {
return result;
}
+ /**
+ * get channel
+ * @param address
+ * @return
+ */
public Channel getChannel(Address address) {
Channel channel = channels.get(address);
if(channel != null && channel.isActive()){
@@ -191,6 +267,12 @@ public class NettyRemotingClient {
return createChannel(address, true);
}
+ /**
+ * create channel
+ * @param address address
+ * @param isSync sync flag
+ * @return channel
+ */
public Channel createChannel(Address address, boolean isSync) {
ChannelFuture future;
try {
@@ -211,6 +293,9 @@ public class NettyRemotingClient {
return null;
}
+ /**
+ * close
+ */
public void close() {
if(isStarted.compareAndSet(true, false)){
try {
@@ -228,6 +313,9 @@ public class NettyRemotingClient {
}
}
+ /**
+ * close channels
+ */
private void closeChannels(){
for (Channel channel : this.channels.values()) {
channel.close();
@@ -235,6 +323,10 @@ public class NettyRemotingClient {
this.channels.clear();
}
+ /**
+ * close channel
+ * @param address address
+ */
public void closeChannel(Address address){
Channel channel = this.channels.remove(address);
if(channel != null){
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index c69bf09540..29b2317633 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -49,7 +49,7 @@ public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
/**
- * server bootstart
+ * server bootstrap
*/
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
index b3801aceb0..beec055403 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class ExecuteTaskRequestCommand implements Serializable {
private String taskId;
private String attemptId;
private String applicationName;
private String groupName;
private String taskName;
private int connectorPort;
private String description;
private String className;
private String methodName;
private String params;
private List shardItems;
public List getShardItems() {
return shardItems;
}
public void setShardItems(List shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* execute task request command
*/
public class ExecuteTaskRequestCommand implements Serializable {
/**
* task id
*/
private String taskId;
/**
* attempt id
*/
private String attemptId;
/**
* application name
*/
private String applicationName;
/**
* group name
*/
private String groupName;
/**
* task name
*/
private String taskName;
/**
* connector port
*/
private int connectorPort;
/**
* description info
*/
private String description;
/**
* class name
*/
private String className;
/**
* method name
*/
private String methodName;
/**
* parameters
*/
private String params;
/**
* shard itemds
*/
private List shardItems;
public List getShardItems() {
return shardItems;
}
public void setShardItems(List shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
index aeb5f7d858..7e35fa6e75 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
public class ExecuteTaskResponseCommand implements Serializable {
private String taskId;
private String attemptId;
private Object result;
private long receivedTime;
private int executeCount;
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(long opaque){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* execute task response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
/**
* task id
*/
private String taskId;
/**
* attempt id
*/
private String attemptId;
/**
* return result
*/
private Object result;
/**
* received time
*/
private long receivedTime;
/**
* execute count
*/
private int executeCount;
/**
* execute time
*/
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(long opaque){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
index 38a8b14871..c50413e98a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
@@ -23,11 +23,19 @@ import io.netty.buffer.Unpooled;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
-
+/**
+ * ping machine
+ */
public class Ping implements Serializable {
+ /**
+ * ping body
+ */
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
+ /**
+ * request command body
+ */
private static byte[] EMPTY_BODY_ARRAY = new byte[0];
private static final ByteBuf PING_BUF;
@@ -42,10 +50,19 @@ public class Ping implements Serializable {
PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
}
+ /**
+ * ping content
+ * @return result
+ */
public static ByteBuf pingContent(){
return PING_BUF.duplicate();
}
+ /**
+ * create ping command
+ *
+ * @return command
+ */
public static Command create(){
Command command = new Command();
command.setType(CommandType.PING);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
index 088bdd674e..4cc32ed42a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
@@ -29,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class GetLogBytesRequestCommand implements Serializable {
+ /**
+ * log path
+ */
private String path;
public GetLogBytesRequestCommand() {
@@ -47,8 +50,9 @@ public class GetLogBytesRequestCommand implements Serializable {
}
/**
+ * package request command
*
- * @return
+ * @return command
*/
public Command convert2Command(){
Command command = new Command();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
index 339c1a7f95..621d35a804 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
@@ -29,10 +29,19 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class RollViewLogRequestCommand implements Serializable {
+ /**
+ * log path
+ */
private String path;
+ /**
+ * skip line number
+ */
private int skipLineNum;
+ /**
+ * query line number
+ */
private int limit;
public RollViewLogRequestCommand() {
@@ -68,6 +77,11 @@ public class RollViewLogRequestCommand implements Serializable {
this.limit = limit;
}
+ /**
+ * package request command
+ *
+ * @return command
+ */
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
index 69f6009256..8835348ee3 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
@@ -29,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class ViewLogRequestCommand implements Serializable {
+ /**
+ * log path
+ */
private String path;
public ViewLogRequestCommand() {
@@ -46,6 +49,11 @@ public class ViewLogRequestCommand implements Serializable {
this.path = path;
}
+ /**
+ * package request command
+ *
+ * @return command
+ */
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java
index aaf9170781..3d91ba57f6 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java
@@ -18,6 +18,9 @@
package org.apache.dolphinscheduler.remote.exceptions;
+/**
+ * timeout exception
+ */
public class RemotingTimeoutException extends RemotingException{
public RemotingTimeoutException(String message) {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java
index 5ee11a04a7..82cc3f4dbf 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java
@@ -16,6 +16,9 @@
*/
package org.apache.dolphinscheduler.remote.exceptions;
+/**
+ * too much request exception
+ */
public class RemotingTooMuchRequestException extends RemotingException{
public RemotingTooMuchRequestException(String message) {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
index 7cf875b002..84cdae867b 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
@@ -21,6 +21,11 @@ package org.apache.dolphinscheduler.remote.future;
*/
public interface InvokeCallback {
+ /**
+ * operation
+ *
+ * @param responseFuture responseFuture
+ */
void operationComplete(final ResponseFuture responseFuture);
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
index a9bdb39adf..caff34236e 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
@@ -28,18 +28,33 @@ public class ResponseFuture {
private final static ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256);
+ /**
+ * request unique identification
+ */
private final long opaque;
+ /**
+ * timeout
+ */
private final long timeoutMillis;
+ /**
+ * invokeCallback function
+ */
private final InvokeCallback invokeCallback;
+ /**
+ * releaseSemaphore
+ */
private final ReleaseSemaphore releaseSemaphore;
private final CountDownLatch latch = new CountDownLatch(1);
private final long beginTimestamp = System.currentTimeMillis();
+ /**
+ * response command
+ */
private volatile Command responseCommand;
private volatile boolean sendOk = true;
@@ -54,11 +69,22 @@ public class ResponseFuture {
FUTURE_TABLE.put(opaque, this);
}
+ /**
+ * wait for response
+ *
+ * @return command
+ * @throws InterruptedException
+ */
public Command waitResponse() throws InterruptedException {
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
+ /**
+ * put response
+ *
+ * @param responseCommand responseCommand
+ */
public void putResponse(final Command responseCommand) {
this.responseCommand = responseCommand;
this.latch.countDown();
@@ -69,11 +95,18 @@ public class ResponseFuture {
return FUTURE_TABLE.get(opaque);
}
+ /**
+ * whether timeout
+ * @return timeout
+ */
public boolean isTimeout() {
long diff = System.currentTimeMillis() - this.beginTimestamp;
return diff > this.timeoutMillis;
}
+ /**
+ * execute invoke callback
+ */
public void executeInvokeCallback() {
if (invokeCallback != null) {
invokeCallback.operationComplete(this);
@@ -120,6 +153,9 @@ public class ResponseFuture {
return invokeCallback;
}
+ /**
+ * release
+ */
public void release() {
if(this.releaseSemaphore != null){
this.releaseSemaphore.release();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
index 97f6632fb3..d5d0d4df83 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -34,8 +34,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
+ /**
+ * netty client
+ */
private final NettyRemotingClient nettyRemotingClient;
+ /**
+ * callback thread executor
+ */
private final ExecutorService callbackExecutor;
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
@@ -43,17 +49,36 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
this.callbackExecutor = callbackExecutor;
}
+ /**
+ * When the current channel is not active,
+ * the current channel has reached the end of its life cycle
+ *
+ * @param ctx channel handler context
+ * @throws Exception
+ */
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}
+ /**
+ * The current channel reads data from the remote
+ *
+ * @param ctx channel handler context
+ * @param msg message
+ * @throws Exception
+ */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived((Command)msg);
}
+ /**
+ * process received logic
+ *
+ * @param responseCommand responseCommand
+ */
private void processReceived(final Command responseCommand) {
ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
if(future != null){
@@ -74,6 +99,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
}
}
+ /**
+ * caught exception
+ * @param ctx channel handler context
+ * @param cause cause
+ * @throws Exception
+ */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause);
@@ -81,6 +112,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
ctx.channel().close();
}
+ /**
+ * channel write changed
+ *
+ * @param ctx channel handler context
+ * @throws Exception
+ */
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java
index bef64c7dc1..2f0d05ebd4 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java
@@ -19,12 +19,21 @@ package org.apache.dolphinscheduler.remote.utils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * thread factory
+ */
public class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger increment = new AtomicInteger(1);
+ /**
+ * name
+ */
private final String name;
+ /**
+ * count
+ */
private final int count;
public NamedThreadFactory(String name){
@@ -36,6 +45,11 @@ public class NamedThreadFactory implements ThreadFactory {
this.count = count;
}
+ /**
+ * create thread
+ * @param r runnable
+ * @return thread
+ */
@Override
public Thread newThread(Runnable r) {
final String threadName = count > 0 ? String.format(name + "_%d_%d", count, increment.getAndIncrement())
diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
index 1ad6734b49..b6f8e2a8de 100644
--- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
+++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
@@ -41,7 +41,7 @@ public class NettyRemotingClientTest {
/**
- * test ping
+ * test sned sync
*/
@Test
public void testSendSync(){
@@ -69,6 +69,9 @@ public class NettyRemotingClientTest {
}
}
+ /**
+ * test sned async
+ */
@Test
public void testSendAsync(){
NettyServerConfig serverConfig = new NettyServerConfig();
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
index 8920b8a527..98ee3fdbbf 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
@@ -28,14 +28,29 @@ public class LogPromise {
private static final ConcurrentHashMap PROMISES = new ConcurrentHashMap<>();
+ /**
+ * request unique identification
+ */
private long opaque;
+ /**
+ * start timemillis
+ */
private final long start;
+ /**
+ * timeout
+ */
private final long timeout;
+ /**
+ * latch
+ */
private final CountDownLatch latch;
+ /**
+ * result
+ */
private Object result;
public LogPromise(long opaque, long timeout){
@@ -59,15 +74,28 @@ public class LogPromise {
}
}
+ /**
+ * countdown
+ *
+ * @param result result
+ */
private void doCountDown(Object result){
this.result = result;
this.latch.countDown();
}
+ /**
+ * whether timeout
+ * @return timeout
+ */
public boolean isTimeout(){
return System.currentTimeMillis() - start > timeout;
}
+ /**
+ * get result
+ * @return
+ */
public Object getResult(){
try {
latch.await(timeout, TimeUnit.MILLISECONDS);