From 1ac1d582e6d0d2ba1adb45bd150e8a7854ec32f3 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 13 Feb 2020 15:55:10 +0800 Subject: [PATCH] remote module add comment --- .../remote/NettyRemotingClient.java | 77 ++++++++++++++++++- .../remote/NettyRemotingServer.java | 55 +++++++++++++ .../remote/codec/NettyDecoder.java | 17 ++++ .../remote/codec/NettyEncoder.java | 8 ++ .../remote/command/Command.java | 5 +- .../remote/command/CommandHeader.java | 9 +++ .../remote/command/CommandType.java | 2 +- .../command/ExecuteTaskRequestCommand.java | 2 +- .../command/ExecuteTaskResponseCommand.java | 2 +- .../dolphinscheduler/remote/command/Ping.java | 19 ++++- .../dolphinscheduler/remote/command/Pong.java | 23 +++++- .../log/GetLogBytesRequestCommand.java | 9 ++- .../log/GetLogBytesResponseCommand.java | 9 +++ .../log/RollViewLogRequestCommand.java | 17 ++++ .../log/RollViewLogResponseCommand.java | 9 +++ .../command/log/ViewLogRequestCommand.java | 8 ++ .../command/log/ViewLogResponseCommand.java | 9 +++ .../remote/config/NettyClientConfig.java | 15 ++++ .../remote/config/NettyServerConfig.java | 21 +++++ .../remote/handler/NettyClientHandler.java | 51 ++++++++++++ .../remote/handler/NettyServerHandler.java | 52 +++++++++++++ .../processor/NettyRequestProcessor.java | 5 ++ .../remote/utils/Address.java | 6 ++ .../remote/utils/ChannelUtils.java | 16 ++++ .../remote/utils/Constants.java | 10 +++ .../remote/utils/FastJsonSerializer.java | 21 +++++ .../dolphinscheduler/remote/utils/Pair.java | 6 ++ .../remote/NettyRemotingClientTest.java | 6 ++ 28 files changed, 481 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 678fe84f90..df0c13ad38 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -50,22 +50,51 @@ public class NettyRemotingClient { private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class); + /** + * bootstrap + */ private final Bootstrap bootstrap = new Bootstrap(); + /** + * encoder + */ private final NettyEncoder encoder = new NettyEncoder(); + /** + * channels + */ private final ConcurrentHashMap channels = new ConcurrentHashMap(); + /** + * default executor + */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); + /** + * started flag + */ private final AtomicBoolean isStarted = new AtomicBoolean(false); + /** + * worker group + */ private final NioEventLoopGroup workerGroup; + /** + * client handler + */ private final NettyClientHandler clientHandler = new NettyClientHandler(this); + /** + * netty client config + */ private final NettyClientConfig clientConfig; + /** + * netty client init + * + * @param clientConfig client config + */ public NettyRemotingClient(final NettyClientConfig clientConfig){ this.clientConfig = clientConfig; this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { @@ -79,6 +108,9 @@ public class NettyRemotingClient { this.start(); } + /** + * netty server start + */ private void start(){ this.bootstrap @@ -97,18 +129,36 @@ public class NettyRemotingClient { encoder); } }); - // isStarted.compareAndSet(false, true); } + /** + * register processor + * + * @param commandType command type + * @param processor processor + */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { registerProcessor(commandType, processor, null); } + /** + * register processor + * + * @param commandType command type + * @param processor processor + * @param executor thread executor + */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { this.clientHandler.registerProcessor(commandType, processor, executor); } + /** + * send connect + * @param address address + * @param command command + * @throws RemotingException + */ public void send(final Address address, final Command command) throws RemotingException { final Channel channel = getChannel(address); if (channel == null) { @@ -132,6 +182,11 @@ public class NettyRemotingClient { } } + /** + * get channel + * @param address address + * @return channel + */ public Channel getChannel(Address address) { Channel channel = channels.get(address); if(channel != null && channel.isActive()){ @@ -140,6 +195,12 @@ public class NettyRemotingClient { return createChannel(address, true); } + /** + * create channel + * @param address address + * @param isSync is sync + * @return channel + */ public Channel createChannel(Address address, boolean isSync) { ChannelFuture future; try { @@ -160,10 +221,17 @@ public class NettyRemotingClient { return null; } + /** + * get default thread executor + * @return thread executor + */ public ExecutorService getDefaultExecutor() { return defaultExecutor; } + /** + * close client + */ public void close() { if(isStarted.compareAndSet(true, false)){ try { @@ -181,6 +249,9 @@ public class NettyRemotingClient { } } + /** + * close channel + */ private void closeChannels(){ for (Channel channel : this.channels.values()) { channel.close(); @@ -188,6 +259,10 @@ public class NettyRemotingClient { this.channels.clear(); } + /** + * remove channel + * @param address address + */ public void removeChannel(Address address){ Channel channel = this.channels.remove(address); if(channel != null){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index 5823dbb088..c69bf09540 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -48,28 +48,58 @@ public class NettyRemotingServer { private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class); + /** + * server bootstart + */ private final ServerBootstrap serverBootstrap = new ServerBootstrap(); + /** + * encoder + */ private final NettyEncoder encoder = new NettyEncoder(); + /** + * default executor + */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); + /** + * boss group + */ private final NioEventLoopGroup bossGroup; + /** + * worker group + */ private final NioEventLoopGroup workGroup; + /** + * server config + */ private final NettyServerConfig serverConfig; + /** + * server handler + */ private final NettyServerHandler serverHandler = new NettyServerHandler(this); + /** + * started flag + */ private final AtomicBoolean isStarted = new AtomicBoolean(false); + /** + * server init + * + * @param serverConfig server config + */ public NettyRemotingServer(final NettyServerConfig serverConfig){ this.serverConfig = serverConfig; this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); + @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); } @@ -78,12 +108,16 @@ public class NettyRemotingServer { this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); + @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); } }); } + /** + * server start + */ public void start(){ if(this.isStarted.get()){ @@ -125,6 +159,11 @@ public class NettyRemotingServer { isStarted.compareAndSet(false, true); } + /** + * init netty channel + * @param ch socket channel + * @throws Exception + */ private void initNettyChannel(NioSocketChannel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encoder", encoder); @@ -132,14 +171,30 @@ public class NettyRemotingServer { pipeline.addLast("handler", serverHandler); } + /** + * register processor + * @param commandType command type + * @param processor processor + */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } + /** + * register processor + * + * @param commandType command type + * @param processor processor + * @param executor thread executor + */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { this.serverHandler.registerProcessor(commandType, processor, executor); } + /** + * get default thread executor + * @return thread executor + */ public ExecutorService getDefaultExecutor() { return defaultExecutor; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java index 998f4ee7d5..caa4fbdd17 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java @@ -38,6 +38,14 @@ public class NettyDecoder extends ReplayingDecoder { private final CommandHeader commandHeader = new CommandHeader(); + /** + * decode + * + * @param ctx channel handler context + * @param in byte buffer + * @param out out content + * @throws Exception + */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { switch (state()){ @@ -67,6 +75,11 @@ public class NettyDecoder extends ReplayingDecoder { } } + /** + * get command type + * @param type type + * @return + */ private CommandType commandType(byte type){ for(CommandType ct : CommandType.values()){ if(ct.ordinal() == type){ @@ -76,6 +89,10 @@ public class NettyDecoder extends ReplayingDecoder { return null; } + /** + * check magic + * @param magic magic + */ private void checkMagic(byte magic) { if (magic != Command.MAGIC) { throw new IllegalArgumentException("illegal packet [magic]" + magic); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java index dd4e523a14..4e9836a26f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java @@ -28,6 +28,14 @@ import org.apache.dolphinscheduler.remote.command.Command; @Sharable public class NettyEncoder extends MessageToByteEncoder { + /** + * encode + * + * @param ctx channel handler context + * @param msg command + * @param out byte buffer + * @throws Exception + */ @Override protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception { if(msg == null){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java index 4687db39c0..ee95044764 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java @@ -36,7 +36,7 @@ public class Command implements Serializable { } /** - * comman type + * command type */ private CommandType type; @@ -45,6 +45,9 @@ public class Command implements Serializable { */ private long opaque; + /** + * data body + */ private byte[] body; public CommandType getType() { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java index 92f7ac35dc..78948a5c0c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java @@ -23,10 +23,19 @@ import java.io.Serializable; */ public class CommandHeader implements Serializable { + /** + * type + */ private byte type; + /** + * request unique identification + */ private long opaque; + /** + * body length + */ private int bodyLength; public int getBodyLength() { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 185358a73c..b1b24d3303 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task response */ EXECUTE_TASK_RESPONSE, PING, PONG; } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java index e75c2de379..a582221cd3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskRequestCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); private String taskId; private String attemptId; private String applicationName; private String groupName; private String taskName; private int connectorPort; private String description; private String className; private String methodName; private String params; private List shardItems; public List getShardItems() { return shardItems; } public void setShardItems(List shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * application name */ private String applicationName; /** * group name */ private String groupName; /** * task name */ private String taskName; /** * connect port */ private int connectorPort; /** * description info */ private String description; /** * class name */ private String className; /** * method name */ private String methodName; /** * params */ private String params; /** * shard items */ private List shardItems; public List getShardItems() { return shardItems; } public void setShardItems(List shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java index fafb57535b..0268653b5d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskResponseCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); private String taskId; private String attemptId; private Object result; private long receivedTime; private int executeCount; private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; /** * execute taks response command */ public class ExecuteTaskResponseCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * result info */ private Object result; /** * receive time */ private long receivedTime; /** * execute count */ private int executeCount; /** * execute time */ private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } /** * package response command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java index 365d451564..4f32d5f699 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java @@ -23,13 +23,21 @@ import io.netty.buffer.Unpooled; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; - +/** + * ping machine + */ public class Ping implements Serializable { private static final AtomicLong ID = new AtomicLong(1); + /** + * ping body + */ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; + /** + * request command body + */ private static byte[] EMPTY_BODY_ARRAY = new byte[0]; private static final ByteBuf PING_BUF; @@ -44,10 +52,19 @@ public class Ping implements Serializable { PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); } + /** + * ping connect + * @return result + */ public static ByteBuf pingContent(){ return PING_BUF.duplicate(); } + /** + * package ping command + * + * @return command + */ public static Command create(){ Command command = new Command(ID.getAndIncrement()); command.setType(CommandType.PING); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java index bc5abdad79..e52cef6d92 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java @@ -22,13 +22,24 @@ import io.netty.buffer.Unpooled; import java.io.Serializable; - +/** + * Pong return after ping + */ public class Pong implements Serializable { + /** + * pong body + */ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; + /** + * pong command body + */ private static byte[] EMPTY_BODY_ARRAY = new byte[0]; + /** + * ping byte buffer + */ private static final ByteBuf PONG_BUF; static { @@ -41,10 +52,20 @@ public class Pong implements Serializable { PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); } + /** + * ping content + * @return result + */ public static ByteBuf pingContent(){ return PONG_BUF.duplicate(); } + /** + * package pong command + * + * @param opaque request unique identification + * @return command + */ public static Command create(long opaque){ Command command = new Command(opaque); command.setType(CommandType.PONG); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java index 1a2e6e4dd1..9b064b7136 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java @@ -29,8 +29,14 @@ import java.util.concurrent.atomic.AtomicLong; */ public class GetLogBytesRequestCommand implements Serializable { + /** + * request id + */ private static final AtomicLong REQUEST = new AtomicLong(1); + /** + * log path + */ private String path; public GetLogBytesRequestCommand() { @@ -49,8 +55,9 @@ public class GetLogBytesRequestCommand implements Serializable { } /** + * package request command * - * @return + * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java index 05692fb5c9..deaf9b8d85 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java @@ -28,6 +28,9 @@ import java.io.Serializable; */ public class GetLogBytesResponseCommand implements Serializable { + /** + * log byte data + */ private byte[] data; public GetLogBytesResponseCommand() { @@ -45,6 +48,12 @@ public class GetLogBytesResponseCommand implements Serializable { this.data = data; } + /** + * package response command + * + * @param opaque request unique identification + * @return command + */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.GET_LOG_BYTES_RESPONSE); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java index 49d19aa1f2..f072c479f4 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java @@ -29,12 +29,24 @@ import java.util.concurrent.atomic.AtomicLong; */ public class RollViewLogRequestCommand implements Serializable { + /** + * request id + */ private static final AtomicLong REQUEST = new AtomicLong(1); + /** + * log path + */ private String path; + /** + * skip line number + */ private int skipLineNum; + /** + * query log line number limit + */ private int limit; public RollViewLogRequestCommand() { @@ -70,6 +82,11 @@ public class RollViewLogRequestCommand implements Serializable { this.limit = limit; } + /** + * package request command + * + * @return command + */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.ROLL_VIEW_LOG_REQUEST); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java index def3257073..591d787200 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java @@ -28,6 +28,9 @@ import java.io.Serializable; */ public class RollViewLogResponseCommand implements Serializable { + /** + * response data + */ private String msg; public RollViewLogResponseCommand() { @@ -45,6 +48,12 @@ public class RollViewLogResponseCommand implements Serializable { this.msg = msg; } + /** + * package response command + * + * @param opaque request unique identification + * @return command + */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java index 9ba9cd3c23..5dcefc6233 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java @@ -29,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ViewLogRequestCommand implements Serializable { + /** + * request id + */ private static final AtomicLong REQUEST = new AtomicLong(1); private String path; @@ -48,6 +51,11 @@ public class ViewLogRequestCommand implements Serializable { this.path = path; } + /** + * package request command + * + * @return command + */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java index 6e3c799a3d..dffadade26 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java @@ -28,6 +28,9 @@ import java.io.Serializable; */ public class ViewLogResponseCommand implements Serializable { + /** + * response data + */ private String msg; public ViewLogResponseCommand() { @@ -45,6 +48,12 @@ public class ViewLogResponseCommand implements Serializable { this.msg = msg; } + /** + * package response command + * + * @param opaque request unique identification + * @return command + */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java index 56d2643a67..831e05f7e7 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java @@ -23,14 +23,29 @@ import org.apache.dolphinscheduler.remote.utils.Constants; */ public class NettyClientConfig { + /** + * worker threads,default get machine cpus + */ private int workerThreads = Constants.CPUS; + /** + * whether tpc delay + */ private boolean tcpNoDelay = true; + /** + * whether keep alive + */ private boolean soKeepalive = true; + /** + * send buffer size + */ private int sendBufferSize = 65535; + /** + * receive buffer size + */ private int receiveBufferSize = 65535; public int getWorkerThreads() { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java index 847f316089..4ec8a0f7a7 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java @@ -23,18 +23,39 @@ import org.apache.dolphinscheduler.remote.utils.Constants; */ public class NettyServerConfig { + /** + * init the server connectable queue + */ private int soBacklog = 1024; + /** + * whether tpc delay + */ private boolean tcpNoDelay = true; + /** + * whether keep alive + */ private boolean soKeepalive = true; + /** + * send buffer size + */ private int sendBufferSize = 65535; + /** + * receive buffer size + */ private int receiveBufferSize = 65535; + /** + * worker threads,default get machine cpus + */ private int workerThread = Constants.CPUS; + /** + * listen port + */ private int listenPort = 12346; public int getListenPort() { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index b06308090f..6aceb5a41b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -38,29 +38,62 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); + /** + * netty remote client + */ private final NettyRemotingClient nettyRemotingClient; + /** + * client processors queue + */ private final ConcurrentHashMap> processors = new ConcurrentHashMap(); public NettyClientHandler(NettyRemotingClient nettyRemotingClient){ this.nettyRemotingClient = nettyRemotingClient; } + /** + * When the current channel is not active, + * the current channel has reached the end of its life cycle + * + * @param ctx channel handler context + * @throws Exception + */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); ctx.channel().close(); } + /** + * The current channel reads data from the remote + * + * @param ctx channel handler context + * @param msg message + * @throws Exception + */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { processReceived(ctx.channel(), (Command)msg); } + /** + * register processor + * + * @param commandType command type + * @param processor processor + */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor()); } + /** + * register processor + * + * @param commandType command type + * @param processor processor + * @param executor thread executor + */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; if(executorRef == null){ @@ -69,6 +102,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { this.processors.putIfAbsent(commandType, new Pair(processor, executorRef)); } + /** + * process received logic + * + * @param channel channel + * @param msg message + */ private void processReceived(final Channel channel, final Command msg) { final CommandType commandType = msg.getType(); final Pair pair = processors.get(commandType); @@ -93,6 +132,13 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * caught exception + * + * @param ctx channel handler context + * @param cause cause + * @throws Exception + */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("exceptionCaught : {}", cause); @@ -100,6 +146,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ctx.channel().close(); } + /** + * channel write changed + * @param ctx channel handler context + * @throws Exception + */ @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java index 8a7ee39a77..eabd6560de 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java @@ -39,28 +39,60 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); + /** + * netty remote server + */ private final NettyRemotingServer nettyRemotingServer; + /** + * server processors queue + */ private final ConcurrentHashMap> processors = new ConcurrentHashMap(); public NettyServerHandler(NettyRemotingServer nettyRemotingServer){ this.nettyRemotingServer = nettyRemotingServer; } + /** + * When the current channel is not active, + * the current channel has reached the end of its life cycle + * @param ctx channel handler context + * @throws Exception + */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.channel().close(); } + /** + * The current channel reads data from the remote end + * + * @param ctx channel handler context + * @param msg message + * @throws Exception + */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { processReceived(ctx.channel(), (Command)msg); } + /** + * register processor + * + * @param commandType command type + * @param processor processor + */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } + /** + * register processor + * + * @param commandType command type + * @param processor processor + * @param executor thread executor + */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; if(executorRef == null){ @@ -69,11 +101,18 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { this.processors.putIfAbsent(commandType, new Pair(processor, executorRef)); } + /** + * process received logic + * @param channel channel + * @param msg message + */ private void processReceived(final Channel channel, final Command msg) { final CommandType commandType = msg.getType(); final Pair pair = processors.get(commandType); if (pair != null) { Runnable r = new Runnable() { + + @Override public void run() { try { pair.getLeft().process(channel, msg); @@ -92,12 +131,25 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } } + /** + * caught exception + * + * @param ctx channel handler context + * @param cause cause + * @throws Exception + */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("exceptionCaught : {}", cause); ctx.channel().close(); } + /** + * channel write changed + * + * @param ctx channel handler context + * @throws Exception + */ @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java index 10a8195710..6966b53d17 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java @@ -24,5 +24,10 @@ import org.apache.dolphinscheduler.remote.command.Command; */ public interface NettyRequestProcessor { + /** + * process logic + * @param channel channel + * @param command command + */ void process(final Channel channel, final Command command); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java index 221b895cb9..f61dcd615c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java @@ -23,8 +23,14 @@ import java.io.Serializable; */ public class Address implements Serializable { + /** + * host + */ private String host; + /** + * port + */ private int port; public Address(){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java index e9d93da41d..d7af5fe165 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java @@ -25,14 +25,30 @@ import java.net.InetSocketAddress; */ public class ChannelUtils { + /** + * get local address + * + * @param channel channel + * @return local address + */ public static String getLocalAddress(Channel channel){ return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress(); } + /** + * get remote address + * @param channel channel + * @return remote address + */ public static String getRemoteAddress(Channel channel){ return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress(); } + /** + * channel to address + * @param channel channel + * @return address + */ public static Address toAddress(Channel channel){ InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress()); return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort()); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index c0a930ca41..5733b17790 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -18,14 +18,24 @@ package org.apache.dolphinscheduler.remote.utils; import java.nio.charset.Charset; + +/** + * constant + */ public class Constants { public static final String COMMA = ","; public static final String SLASH = "/"; + /** + * charset + */ public static final Charset UTF8 = Charset.forName("UTF-8"); + /** + * cpus + */ public static final int CPUS = Runtime.getRuntime().availableProcessors(); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java index a9b85461ff..e96796a05c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java @@ -23,15 +23,36 @@ import com.alibaba.fastjson.JSON; */ public class FastJsonSerializer { + /** + * serialize to byte + * + * @param obj object + * @param object type + * @return byte array + */ public static byte[] serialize(T obj) { String json = JSON.toJSONString(obj); return json.getBytes(Constants.UTF8); } + /** + * serialize to string + * @param obj object + * @param object type + * @return string + */ public static String serializeToString(T obj) { return JSON.toJSONString(obj); } + /** + * deserialize + * + * @param src byte array + * @param clazz class + * @param deserialize type + * @return deserialize type + */ public static T deserialize(byte[] src, Class clazz) { return JSON.parseObject(new String(src, Constants.UTF8), clazz); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java index a79a3748cd..2042191486 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java @@ -18,6 +18,12 @@ package org.apache.dolphinscheduler.remote.utils; +/** + * key value pair + * + * @param L generic type + * @param R generic type + */ public class Pair { private L left; diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java index 19fd564bd5..6f0a802af6 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java @@ -32,9 +32,15 @@ import org.junit.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +/** + * netty remote client test + */ public class NettyRemotingClientTest { + /** + * test ping + */ @Test public void testSend(){ NettyServerConfig serverConfig = new NettyServerConfig();