Browse Source

[Improvement][remote] Refactor remote module to fix code smell (#4576)

* chore[remote]: Refactor remote module to fix code smell

* chore[remote]: Add licence header

* chore[remote]: Clean code smell

* chore[remote]: Clean code smell

* chore[remote]: Remove unused import
pull/3/MERGE
Segun Ogundipe 3 years ago committed by GitHub
parent
commit
047b0a312f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 150
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
  2. 64
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
  3. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
  4. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
  5. 42
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemoteException.java
  6. 63
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
  7. 34
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
  8. 40
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
  9. 31
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
  10. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
  11. 25
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java
  12. 5
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java

150
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@ -55,7 +55,6 @@ import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
@ -84,7 +83,7 @@ public class NettyRemotingClient {
/** /**
* channels * channels
*/ */
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128); private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
/** /**
* started flag * started flag
@ -130,7 +129,7 @@ public class NettyRemotingClient {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
if (NettyUtils.useEpoll()) { if (NettyUtils.useEpoll()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -139,7 +138,7 @@ public class NettyRemotingClient {
}); });
} else { } else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -148,8 +147,8 @@ public class NettyRemotingClient {
}); });
} }
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy()); new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor); this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor")); this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
@ -163,40 +162,32 @@ public class NettyRemotingClient {
private void start() { private void start() {
this.bootstrap this.bootstrap
.group(this.workerGroup) .group(this.workerGroup)
.channel(NettyUtils.getSocketChannelClass()) .channel(NettyUtils.getSocketChannelClass())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) { public void initChannel(SocketChannel ch) {
ch.pipeline() ch.pipeline()
.addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
.addLast(new NettyDecoder(), clientHandler, encoder); .addLast(new NettyDecoder(), clientHandler, encoder);
} }
}); });
this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() { this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);
@Override
public void run() {
ResponseFuture.scanFutureTable();
}
}, 5000, 1000, TimeUnit.MILLISECONDS);
//
isStarted.compareAndSet(false, true); isStarted.compareAndSet(false, true);
} }
/** /**
* async send * async send
* *
* @param host host * @param host host
* @param command command * @param command command
* @param timeoutMillis timeoutMillis * @param timeoutMillis timeoutMillis
* @param invokeCallback callback function * @param invokeCallback callback function
* @throws InterruptedException
* @throws RemotingException
*/ */
public void sendAsync(final Host host, final Command command, public void sendAsync(final Host host, final Command command,
final long timeoutMillis, final long timeoutMillis,
@ -205,53 +196,49 @@ public class NettyRemotingClient {
if (channel == null) { if (channel == null) {
throw new RemotingException("network error"); throw new RemotingException("network error");
} }
/** /*
* request unique identification * request unique identification
*/ */
final long opaque = command.getOpaque(); final long opaque = command.getOpaque();
/** /*
* control concurrency number * control concurrency number
*/ */
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) { if (acquired) {
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore); final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
/** /*
* response future * response future
*/ */
final ResponseFuture responseFuture = new ResponseFuture(opaque, final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis, timeoutMillis,
invokeCallback, invokeCallback,
releaseSemaphore); releaseSemaphore);
try { try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener() { channel.writeAndFlush(command).addListener(future -> {
if (future.isSuccess()) {
@Override responseFuture.setSendOk(true);
public void operationComplete(ChannelFuture future) throws Exception { return;
if (future.isSuccess()) { } else {
responseFuture.setSendOk(true); responseFuture.setSendOk(false);
return; }
} else { responseFuture.setCause(future.cause());
responseFuture.setSendOk(false); responseFuture.putResponse(null);
} try {
responseFuture.setCause(future.cause()); responseFuture.executeInvokeCallback();
responseFuture.putResponse(null); } catch (Exception ex) {
try { logger.error("execute callback error", ex);
responseFuture.executeInvokeCallback(); } finally {
} catch (Throwable ex) { responseFuture.release();
logger.error("execute callback error", ex);
} finally {
responseFuture.release();
}
} }
}); });
} catch (Throwable ex) { } catch (Exception ex) {
responseFuture.release(); responseFuture.release();
throw new RemotingException(String.format("send command to host: %s failed", host), ex); throw new RemotingException(String.format("send command to host: %s failed", host), ex);
} }
} else { } else {
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
throw new RemotingTooMuchRequestException(message); throw new RemotingTooMuchRequestException(message);
} }
} }
@ -259,12 +246,10 @@ public class NettyRemotingClient {
/** /**
* sync send * sync send
* *
* @param host host * @param host host
* @param command command * @param command command
* @param timeoutMillis timeoutMillis * @param timeoutMillis timeoutMillis
* @return command * @return command
* @throws InterruptedException
* @throws RemotingException
*/ */
public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException { public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host); final Channel channel = getChannel(host);
@ -273,21 +258,18 @@ public class NettyRemotingClient {
} }
final long opaque = command.getOpaque(); final long opaque = command.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
channel.writeAndFlush(command).addListener(new ChannelFutureListener() { channel.writeAndFlush(command).addListener(future -> {
@Override if (future.isSuccess()) {
public void operationComplete(ChannelFuture future) throws Exception { responseFuture.setSendOk(true);
if (future.isSuccess()) { return;
responseFuture.setSendOk(true); } else {
return; responseFuture.setSendOk(false);
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
logger.error("send command {} to host {} failed", command, host);
} }
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
logger.error("send command {} to host {} failed", command, host);
}); });
/** /*
* sync wait for result * sync wait for result
*/ */
Command result = responseFuture.waitResponse(); Command result = responseFuture.waitResponse();
@ -304,9 +286,8 @@ public class NettyRemotingClient {
/** /**
* send task * send task
* *
* @param host host * @param host host
* @param command command * @param command command
* @throws RemotingException
*/ */
public void send(final Host host, final Command command) throws RemotingException { public void send(final Host host, final Command command) throws RemotingException {
Channel channel = getChannel(host); Channel channel = getChannel(host);
@ -332,7 +313,7 @@ public class NettyRemotingClient {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
@ -342,8 +323,8 @@ public class NettyRemotingClient {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor); this.clientHandler.registerProcessor(commandType, processor, executor);
@ -351,9 +332,6 @@ public class NettyRemotingClient {
/** /**
* get channel * get channel
*
* @param host
* @return
*/ */
public Channel getChannel(Host host) { public Channel getChannel(Host host) {
Channel channel = channels.get(host); Channel channel = channels.get(host);
@ -366,7 +344,7 @@ public class NettyRemotingClient {
/** /**
* create channel * create channel
* *
* @param host host * @param host host
* @param isSync sync flag * @param isSync sync flag
* @return channel * @return channel
*/ */

64
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemoteException;
import org.apache.dolphinscheduler.remote.handler.NettyServerHandler; import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
@ -93,6 +94,11 @@ public class NettyRemotingServer {
*/ */
private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* Netty server bind fail message
*/
private static final String NETTY_BIND_FAILURE_MSG = "NettyRemotingServer bind %s fail";
/** /**
* server init * server init
* *
@ -102,7 +108,7 @@ public class NettyRemotingServer {
this.serverConfig = serverConfig; this.serverConfig = serverConfig;
if (NettyUtils.useEpoll()) { if (NettyUtils.useEpoll()) {
this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() { this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -111,7 +117,7 @@ public class NettyRemotingServer {
}); });
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -120,7 +126,7 @@ public class NettyRemotingServer {
}); });
} else { } else {
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -129,7 +135,7 @@ public class NettyRemotingServer {
}); });
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -145,35 +151,35 @@ public class NettyRemotingServer {
public void start() { public void start() {
if (isStarted.compareAndSet(false, true)) { if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap this.serverBootstrap
.group(this.bossGroup, this.workGroup) .group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass()) .channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) {
initNettyChannel(ch); initNettyChannel(ch);
} }
}); });
ChannelFuture future; ChannelFuture future;
try { try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) { } catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
} }
if (future.isSuccess()) { if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) { } else if (future.cause() != null) {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
} else { } else {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
} }
} }
} }
@ -185,17 +191,17 @@ public class NettyRemotingServer {
*/ */
private void initNettyChannel(SocketChannel ch) { private void initNettyChannel(SocketChannel ch) {
ch.pipeline() ch.pipeline()
.addLast("encoder", encoder) .addLast("encoder", encoder)
.addLast("decoder", new NettyDecoder()) .addLast("decoder", new NettyDecoder())
.addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
.addLast("handler", serverHandler); .addLast("handler", serverHandler);
} }
/** /**
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
@ -205,8 +211,8 @@ public class NettyRemotingServer {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.serverHandler.registerProcessor(commandType, processor, executor); this.serverHandler.registerProcessor(commandType, processor, executor);

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java

@ -14,16 +14,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.codec; package org.apache.dolphinscheduler.remote.codec;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
/** /**
* netty encoder * netty encoder
*/ */
@Sharable @Sharable
public class NettyEncoder extends MessageToByteEncoder<Command> { public class NettyEncoder extends MessageToByteEncoder<Command> {
@ -34,12 +37,11 @@ public class NettyEncoder extends MessageToByteEncoder<Command> {
* @param ctx channel handler context * @param ctx channel handler context
* @param msg command * @param msg command
* @param out byte buffer * @param out byte buffer
* @throws Exception
*/ */
@Override @Override
protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
if(msg == null){ if (msg == null) {
throw new Exception("encode msg is null"); throw new RemotingException("encode msg is null");
} }
out.writeByte(Command.MAGIC); out.writeByte(Command.MAGIC);
out.writeByte(Command.VERSION); out.writeByte(Command.VERSION);

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java

@ -23,7 +23,7 @@ import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
* kill task response command * kill task response command
*/ */
public class TaskKillResponseCommand implements Serializable { public class TaskKillResponseCommand implements Serializable {
@ -51,7 +51,7 @@ public class TaskKillResponseCommand implements Serializable {
/** /**
* other resource manager appId , for example : YARN etc * other resource manager appId , for example : YARN etc
*/ */
protected List<String> appIds; private List<String> appIds;
public int getTaskInstanceId() { public int getTaskInstanceId() {
return taskInstanceId; return taskInstanceId;
@ -94,7 +94,7 @@ public class TaskKillResponseCommand implements Serializable {
} }
/** /**
* package request command * package request command
* *
* @return command * @return command
*/ */

42
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemoteException.java

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.exceptions;
/**
* Custom runtime exception
*/
public class RemoteException extends RuntimeException {
/**
* Construct a new runtime exception with the detail message
*
* @param message detail message
*/
public RemoteException(String message) {
super(message);
}
/**
* Construct a new runtime exception with the detail message and cause
*
* @param message detail message
*/
public RemoteException(String message, Throwable cause) {
super(message, cause);
}
}

63
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java

@ -18,14 +18,17 @@
package org.apache.dolphinscheduler.remote.future; package org.apache.dolphinscheduler.remote.future;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* response future * response future
@ -34,25 +37,25 @@ public class ResponseFuture {
private static final Logger LOGGER = LoggerFactory.getLogger(ResponseFuture.class); private static final Logger LOGGER = LoggerFactory.getLogger(ResponseFuture.class);
private static final ConcurrentHashMap<Long,ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256); private static final ConcurrentHashMap<Long, ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
/** /**
* request unique identification * request unique identification
*/ */
private final long opaque; private final long opaque;
/** /**
* timeout * timeout
*/ */
private final long timeoutMillis; private final long timeoutMillis;
/** /**
* invokeCallback function * invokeCallback function
*/ */
private final InvokeCallback invokeCallback; private final InvokeCallback invokeCallback;
/** /**
* releaseSemaphore * releaseSemaphore
*/ */
private final ReleaseSemaphore releaseSemaphore; private final ReleaseSemaphore releaseSemaphore;
@ -61,7 +64,7 @@ public class ResponseFuture {
private final long beginTimestamp = System.currentTimeMillis(); private final long beginTimestamp = System.currentTimeMillis();
/** /**
* response command * response command
*/ */
private Command responseCommand; private Command responseCommand;
@ -78,10 +81,9 @@ public class ResponseFuture {
} }
/** /**
* wait for response * wait for response
* *
* @return command * @return command
* @throws InterruptedException
*/ */
public Command waitResponse() throws InterruptedException { public Command waitResponse() throws InterruptedException {
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
@ -89,7 +91,7 @@ public class ResponseFuture {
} }
/** /**
* put response * put response
* *
* @param responseCommand responseCommand * @param responseCommand responseCommand
*/ */
@ -99,12 +101,13 @@ public class ResponseFuture {
FUTURE_TABLE.remove(opaque); FUTURE_TABLE.remove(opaque);
} }
public static ResponseFuture getFuture(long opaque){ public static ResponseFuture getFuture(long opaque) {
return FUTURE_TABLE.get(opaque); return FUTURE_TABLE.get(opaque);
} }
/** /**
* whether timeout * whether timeout
*
* @return timeout * @return timeout
*/ */
public boolean isTimeout() { public boolean isTimeout() {
@ -113,7 +116,7 @@ public class ResponseFuture {
} }
/** /**
* execute invoke callback * execute invoke callback
*/ */
public void executeInvokeCallback() { public void executeInvokeCallback() {
if (invokeCallback != null) { if (invokeCallback != null) {
@ -162,10 +165,10 @@ public class ResponseFuture {
} }
/** /**
* release * release
*/ */
public void release() { public void release() {
if(this.releaseSemaphore != null){ if (this.releaseSemaphore != null) {
this.releaseSemaphore.release(); this.releaseSemaphore.release();
} }
} }
@ -173,7 +176,7 @@ public class ResponseFuture {
/** /**
* scan future table * scan future table
*/ */
public static void scanFutureTable(){ public static void scanFutureTable() {
final List<ResponseFuture> futureList = new LinkedList<>(); final List<ResponseFuture> futureList = new LinkedList<>();
Iterator<Map.Entry<Long, ResponseFuture>> it = FUTURE_TABLE.entrySet().iterator(); Iterator<Map.Entry<Long, ResponseFuture>> it = FUTURE_TABLE.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
@ -189,7 +192,7 @@ public class ResponseFuture {
try { try {
future.release(); future.release();
future.executeInvokeCallback(); future.executeInvokeCallback();
} catch (Throwable ex) { } catch (Exception ex) {
LOGGER.warn("scanFutureTable, execute callback error", ex); LOGGER.warn("scanFutureTable, execute callback error", ex);
} }
} }
@ -197,16 +200,16 @@ public class ResponseFuture {
@Override @Override
public String toString() { public String toString() {
return "ResponseFuture{" + return "ResponseFuture{"
"opaque=" + opaque + + "opaque=" + opaque
", timeoutMillis=" + timeoutMillis + + ", timeoutMillis=" + timeoutMillis
", invokeCallback=" + invokeCallback + + ", invokeCallback=" + invokeCallback
", releaseSemaphore=" + releaseSemaphore + + ", releaseSemaphore=" + releaseSemaphore
", latch=" + latch + + ", latch=" + latch
", beginTimestamp=" + beginTimestamp + + ", beginTimestamp=" + beginTimestamp
", responseCommand=" + responseCommand + + ", responseCommand=" + responseCommand
", sendOk=" + sendOk + + ", sendOk=" + sendOk
", cause=" + cause + + ", cause=" + cause
'}'; + '}';
} }
} }

34
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java

@ -74,7 +74,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) { public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
this.nettyRemotingClient = nettyRemotingClient; this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor; this.callbackExecutor = callbackExecutor;
this.processors = new ConcurrentHashMap(); this.processors = new ConcurrentHashMap<>();
} }
/** /**
@ -82,10 +82,9 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
* the current channel has reached the end of its life cycle * the current channel has reached the end of its life cycle
* *
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception
*/ */
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) {
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close(); ctx.channel().close();
} }
@ -95,10 +94,9 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param msg message * @param msg message
* @throws Exception
*/ */
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) {
processReceived(ctx.channel(), (Command) msg); processReceived(ctx.channel(), (Command) msg);
} }
@ -106,7 +104,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
@ -116,8 +114,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor; ExecutorService executorRef = executor;
@ -138,12 +136,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
future.setResponseCommand(command); future.setResponseCommand(command);
future.release(); future.release();
if (future.getInvokeCallback() != null) { if (future.getInvokeCallback() != null) {
this.callbackExecutor.submit(new Runnable() { this.callbackExecutor.submit(future::executeInvokeCallback);
@Override
public void run() {
future.executeInvokeCallback();
}
});
} else { } else {
future.putResponse(command); future.putResponse(command);
} }
@ -158,7 +151,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
Runnable run = () -> { Runnable run = () -> {
try { try {
pair.getLeft().process(channel, command); pair.getLeft().process(channel, command);
} catch (Throwable e) { } catch (Exception e) {
logger.error(String.format("process command %s exception", command), e); logger.error(String.format("process command %s exception", command), e);
} }
}; };
@ -175,13 +168,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/** /**
* caught exception * caught exception
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param cause cause * @param cause cause
* @throws Exception
*/ */
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("exceptionCaught : {}", cause); logger.error("exceptionCaught : {}", cause.getMessage(), cause);
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close(); ctx.channel().close();
} }
@ -193,11 +185,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
heartBeat.setType(CommandType.HEART_BEAT); heartBeat.setType(CommandType.HEART_BEAT);
heartBeat.setBody(heartBeatData); heartBeat.setBody(heartBeatData);
ctx.writeAndFlush(heartBeat) ctx.writeAndFlush(heartBeat)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else { } else {
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);
} }
} }
} }

40
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java

@ -55,7 +55,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/** /**
* server processors queue * server processors queue
*/ */
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap(); private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap<>();
public NettyServerHandler(NettyRemotingServer nettyRemotingServer) { public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
this.nettyRemotingServer = nettyRemotingServer; this.nettyRemotingServer = nettyRemotingServer;
@ -66,10 +66,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* the current channel has reached the end of its life cycle * the current channel has reached the end of its life cycle
* *
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception
*/ */
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) {
ctx.channel().close(); ctx.channel().close();
} }
@ -78,10 +77,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param msg message * @param msg message
* @throws Exception
*/ */
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) {
processReceived(ctx.channel(), (Command) msg); processReceived(ctx.channel(), (Command) msg);
} }
@ -89,7 +87,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
@ -99,8 +97,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor; ExecutorService executorRef = executor;
@ -114,21 +112,17 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* process received logic * process received logic
* *
* @param channel channel * @param channel channel
* @param msg message * @param msg message
*/ */
private void processReceived(final Channel channel, final Command msg) { private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType(); final CommandType commandType = msg.getType();
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType); final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) { if (pair != null) {
Runnable r = new Runnable() { Runnable r = () -> {
try {
@Override pair.getLeft().process(channel, msg);
public void run() { } catch (Exception ex) {
try { logger.error("process msg {} error", msg, ex);
pair.getLeft().process(channel, msg);
} catch (Throwable ex) {
logger.error("process msg {} error", msg, ex);
}
} }
}; };
try { try {
@ -144,9 +138,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/** /**
* caught exception * caught exception
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param cause cause * @param cause cause
* @throws Exception
*/ */
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
@ -158,7 +151,6 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* channel write changed * channel write changed
* *
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception
*/ */
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
@ -168,14 +160,14 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (!ch.isWritable()) { if (!ch.isWritable()) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, over high water level : {}", logger.warn("{} is not writable, over high water level : {}",
ch, config.getWriteBufferHighWaterMark()); ch, config.getWriteBufferHighWaterMark());
} }
config.setAutoRead(false); config.setAutoRead(false);
} else { } else {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("{} is writable, to low water : {}", logger.warn("{} is writable, to low water : {}",
ch, config.getWriteBufferLowWaterMark()); ch, config.getWriteBufferLowWaterMark());
} }
config.setAutoRead(true); config.setAutoRead(true);
} }
@ -189,4 +181,4 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);
} }
} }
} }

31
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java

@ -14,43 +14,50 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.utils;
import io.netty.channel.Channel; package org.apache.dolphinscheduler.remote.utils;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import io.netty.channel.Channel;
/** /**
* channel utils * channel utils
*/ */
public class ChannelUtils { public class ChannelUtils {
private ChannelUtils() {
throw new IllegalStateException(ChannelUtils.class.getName());
}
/** /**
* get local address * get local address
* *
* @param channel channel * @param channel channel
* @return local address * @return local address
*/ */
public static String getLocalAddress(Channel channel){ public static String getLocalAddress(Channel channel) {
return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress(); return ((InetSocketAddress) channel.localAddress()).getAddress().getHostAddress();
} }
/** /**
* get remote address * get remote address
*
* @param channel channel * @param channel channel
* @return remote address * @return remote address
*/ */
public static String getRemoteAddress(Channel channel){ public static String getRemoteAddress(Channel channel) {
return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress(); return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
} }
/** /**
* channel to address * channel to address
*
* @param channel channel * @param channel channel
* @return address * @return address
*/ */
public static Host toAddress(Channel channel){ public static Host toAddress(Channel channel) {
InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress()); InetSocketAddress socketAddress = ((InetSocketAddress) channel.remoteAddress());
return new Host(socketAddress.getAddress().getHostAddress(), socketAddress.getPort()); return new Host(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
} }

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java

@ -25,6 +25,10 @@ import java.nio.charset.StandardCharsets;
*/ */
public class Constants { public class Constants {
private Constants() {
throw new IllegalStateException(Constants.class.getName());
}
public static final String COMMA = ","; public static final String COMMA = ",";
public static final String SLASH = "/"; public static final String SLASH = "/";

25
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java

@ -16,6 +16,8 @@
*/ */
package org.apache.dolphinscheduler.remote.utils; package org.apache.dolphinscheduler.remote.utils;
import org.apache.dolphinscheduler.remote.exceptions.RemoteException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -28,22 +30,24 @@ import java.util.regex.Pattern;
public class IPUtils { public class IPUtils {
private static final Logger logger = LoggerFactory.getLogger(IPUtils.class); private IPUtils() {
throw new IllegalStateException(IPUtils.class.getName());
}
private static String IP_REGEX = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}"; private static final Logger logger = LoggerFactory.getLogger(IPUtils.class);
private static String LOCAL_HOST = "unknown"; private static String localHost = "unknown";
static { static {
String host = System.getenv("HOSTNAME"); String host = System.getenv("HOSTNAME");
if (isNotEmpty(host)) { if (isNotEmpty(host)) {
LOCAL_HOST = host; localHost = host;
} else { } else {
try { try {
String hostName = InetAddress.getLocalHost().getHostName(); String hostName = InetAddress.getLocalHost().getHostName();
if (isNotEmpty(hostName)) { if (isNotEmpty(hostName)) {
LOCAL_HOST = hostName; localHost = hostName;
} }
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
logger.error("get hostName error!", e); logger.error("get hostName error!", e);
@ -52,7 +56,7 @@ public class IPUtils {
} }
public static String getLocalHost() { public static String getLocalHost() {
return LOCAL_HOST; return localHost;
} }
@ -100,7 +104,7 @@ public class IPUtils {
return addresses; return addresses;
} catch (SocketException e) { } catch (SocketException e) {
throw new RuntimeException(e.getMessage(), e); throw new RemoteException(e.getMessage(), e);
} }
} }
@ -131,12 +135,11 @@ public class IPUtils {
return false; return false;
} }
Pattern pat = Pattern.compile(IP_REGEX); String ipRegex = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}";
Pattern pat = Pattern.compile(ipRegex);
Matcher mat = pat.matcher(addr); Matcher mat = pat.matcher(addr);
boolean ipAddress = mat.find(); return mat.find();
return ipAddress;
} }
} }

5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.remote.utils; package org.apache.dolphinscheduler.remote.utils;
/** /**
* key value pair * key value pair
* *
@ -51,7 +50,7 @@ public class Pair<L, R> {
this.right = right; this.right = right;
} }
public static <L, R> Pair of(L left, R right){ public static <L, R> Pair<L, R> of(L left, R right) {
return new Pair(left, right); return new Pair<>(left, right);
} }
} }

Loading…
Cancel
Save