Browse Source

[Test][remote]update netty util test and fix init channel error (#3671)

pull/3/MERGE
CalvinKirs 4 years ago committed by GitHub
parent
commit
86bcc058a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
  2. 12
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java

33
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java

@ -17,16 +17,6 @@
package org.apache.dolphinscheduler.remote; package org.apache.dolphinscheduler.remote;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -36,15 +26,25 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NettyUtils; import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
/** /**
* remoting netty server * remoting netty server
*/ */
@ -152,10 +152,10 @@ public class NettyRemotingServer {
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<NioSocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(NioSocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
initNettyChannel(ch); initNettyChannel(ch);
} }
}); });
@ -181,9 +181,8 @@ public class NettyRemotingServer {
* init netty channel * init netty channel
* *
* @param ch socket channel * @param ch socket channel
* @throws Exception
*/ */
private void initNettyChannel(NioSocketChannel ch) throws Exception { private void initNettyChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", encoder); pipeline.addLast("encoder", encoder);
pipeline.addLast("decoder", new NettyDecoder()); pipeline.addLast("decoder", new NettyDecoder());

12
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java

@ -17,20 +17,28 @@
package org.apache.dolphinscheduler.remote; package org.apache.dolphinscheduler.remote;
import static org.apache.dolphinscheduler.remote.utils.Constants.OS_NAME;
import org.apache.dolphinscheduler.remote.utils.NettyUtils; import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import io.netty.channel.epoll.Epoll;
/** /**
* NettyUtilTest * NettyUtilTest
*/ */
public class NettyUtilTest { public class NettyUtilTest {
@Test @Test
public void testUserEpoll() { public void testUserEpoll() {
System.setProperty("netty.epoll.enable", "false"); if (OS_NAME.toLowerCase().contains("linux") && Epoll.isAvailable()) {
Assert.assertFalse(NettyUtils.useEpoll()); Assert.assertTrue(NettyUtils.useEpoll());
} else {
Assert.assertFalse(NettyUtils.useEpoll());
}
} }
} }

Loading…
Cancel
Save