Browse Source

Merge pull request #60 in CORE/base-third from ~JU/base-third:feature/10.0 to feature/10.0

* commit '337498b9f17b699e5f47c682d9f3ad7db70f9f23':
  CORE-76 Activator重复启动内置服务器、切换环境的支持 切环境时要关闭socketio,等待的话影响比较大
10.0
superman 7 years ago
parent
commit
7283a30e3d
  1. 132
      fine-socketio/src/com/fr/third/socketio/SocketIOServer.java

132
fine-socketio/src/com/fr/third/socketio/SocketIOServer.java

@ -1,12 +1,12 @@
/** /**
* Copyright 2012 Nikita Koksharov * Copyright 2012 Nikita Koksharov
* * <p>
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -37,6 +37,7 @@ import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -46,43 +47,48 @@ import com.fr.third.socketio.namespace.NamespacesHub;
/** /**
* Fully thread-safe. * Fully thread-safe.
*
*/ */
public class SocketIOServer implements ClientListeners { public class SocketIOServer implements ClientListeners {
private static final Logger log = LoggerFactory.getLogger(SocketIOServer.class); private static final Logger log = LoggerFactory.getLogger(SocketIOServer.class);
private final Configuration configCopy; private final Configuration configCopy;
private final Configuration configuration; private final Configuration configuration;
private final NamespacesHub namespacesHub; private final NamespacesHub namespacesHub;
private final SocketIONamespace mainNamespace; private final SocketIONamespace mainNamespace;
private SocketIOChannelInitializer pipelineFactory = new SocketIOChannelInitializer(); private SocketIOChannelInitializer pipelineFactory = new SocketIOChannelInitializer();
private EventLoopGroup bossGroup; private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup; private EventLoopGroup workerGroup;
public SocketIOServer(Configuration configuration) { public SocketIOServer(Configuration configuration) {
this.configuration = configuration; this.configuration = configuration;
this.configCopy = new Configuration(configuration); this.configCopy = new Configuration(configuration);
namespacesHub = new NamespacesHub(configCopy); namespacesHub = new NamespacesHub(configCopy);
mainNamespace = addNamespace(Namespace.DEFAULT_NAME); mainNamespace = addNamespace(Namespace.DEFAULT_NAME);
} }
public void setPipelineFactory(SocketIOChannelInitializer pipelineFactory) { public void setPipelineFactory(SocketIOChannelInitializer pipelineFactory) {
this.pipelineFactory = pipelineFactory; this.pipelineFactory = pipelineFactory;
} }
/** /**
* Get all clients connected to default namespace * Get all clients connected to default namespace
* *
* @return clients collection * @return clients collection
*/ */
public Collection<SocketIOClient> getAllClients() { public Collection<SocketIOClient> getAllClients() {
return namespacesHub.get(Namespace.DEFAULT_NAME).getAllClients(); return namespacesHub.get(Namespace.DEFAULT_NAME).getAllClients();
} }
/** /**
* Get client by uuid from default namespace * Get client by uuid from default namespace
* *
@ -90,22 +96,25 @@ public class SocketIOServer implements ClientListeners {
* @return client * @return client
*/ */
public SocketIOClient getClient(UUID uuid) { public SocketIOClient getClient(UUID uuid) {
return namespacesHub.get(Namespace.DEFAULT_NAME).getClient(uuid); return namespacesHub.get(Namespace.DEFAULT_NAME).getClient(uuid);
} }
/** /**
* Get all namespaces * Get all namespaces
* *
* @return namespaces collection * @return namespaces collection
*/ */
public Collection<SocketIONamespace> getAllNamespaces() { public Collection<SocketIONamespace> getAllNamespaces() {
return namespacesHub.getAllNamespaces(); return namespacesHub.getAllNamespaces();
} }
public BroadcastOperations getBroadcastOperations() { public BroadcastOperations getBroadcastOperations() {
return new BroadcastOperations(getAllClients(), configCopy.getStoreFactory()); return new BroadcastOperations(getAllClients(), configCopy.getStoreFactory());
} }
/** /**
* Get broadcast operations for clients within * Get broadcast operations for clients within
* room by <code>room</code> name * room by <code>room</code> name
@ -114,47 +123,52 @@ public class SocketIOServer implements ClientListeners {
* @return broadcast operations * @return broadcast operations
*/ */
public BroadcastOperations getRoomOperations(String room) { public BroadcastOperations getRoomOperations(String room) {
Iterable<SocketIOClient> clients = namespacesHub.getRoomClients(room); Iterable<SocketIOClient> clients = namespacesHub.getRoomClients(room);
return new BroadcastOperations(clients, configCopy.getStoreFactory()); return new BroadcastOperations(clients, configCopy.getStoreFactory());
} }
/** /**
* Start server * Start server
*/ */
public void start() { public void start() {
startAsync().syncUninterruptibly(); startAsync().syncUninterruptibly();
} }
/** /**
* Start server asynchronously * Start server asynchronously
* *
* @return void * @return void
*/ */
public Future<Void> startAsync() { public Future<Void> startAsync() {
log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory()); log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory());
initGroups(); initGroups();
pipelineFactory.start(configCopy, namespacesHub); pipelineFactory.start(configCopy, namespacesHub);
Class<? extends ServerChannel> channelClass = NioServerSocketChannel.class; Class<? extends ServerChannel> channelClass = NioServerSocketChannel.class;
if (configCopy.isUseLinuxNativeEpoll()) { if (configCopy.isUseLinuxNativeEpoll()) {
channelClass = EpollServerSocketChannel.class; channelClass = EpollServerSocketChannel.class;
} }
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) b.group(bossGroup, workerGroup)
.channel(channelClass) .channel(channelClass)
.childHandler(pipelineFactory); .childHandler(pipelineFactory);
applyConnectionOptions(b); applyConnectionOptions(b);
InetSocketAddress addr = new InetSocketAddress(configCopy.getPort()); InetSocketAddress addr = new InetSocketAddress(configCopy.getPort());
if (configCopy.getHostname() != null) { if (configCopy.getHostname() != null) {
addr = new InetSocketAddress(configCopy.getHostname(), configCopy.getPort()); addr = new InetSocketAddress(configCopy.getHostname(), configCopy.getPort());
} }
return b.bind(addr).addListener(new FutureListener<Void>() { return b.bind(addr).addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
log.info("SocketIO server started at port: {}", configCopy.getPort()); log.info("SocketIO server started at port: {}", configCopy.getPort());
} else { } else {
@ -163,8 +177,9 @@ public class SocketIOServer implements ClientListeners {
} }
}); });
} }
protected void applyConnectionOptions(ServerBootstrap bootstrap) { protected void applyConnectionOptions(ServerBootstrap bootstrap) {
SocketConfig config = configCopy.getSocketConfig(); SocketConfig config = configCopy.getSocketConfig();
bootstrap.childOption(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()); bootstrap.childOption(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
if (config.getTcpSendBufferSize() != -1) { if (config.getTcpSendBufferSize() != -1) {
@ -176,12 +191,13 @@ public class SocketIOServer implements ClientListeners {
} }
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, config.isTcpKeepAlive()); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, config.isTcpKeepAlive());
bootstrap.childOption(ChannelOption.SO_LINGER, config.getSoLinger()); bootstrap.childOption(ChannelOption.SO_LINGER, config.getSoLinger());
bootstrap.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress()); bootstrap.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress());
bootstrap.option(ChannelOption.SO_BACKLOG, config.getAcceptBackLog()); bootstrap.option(ChannelOption.SO_BACKLOG, config.getAcceptBackLog());
} }
protected void initGroups() { protected void initGroups() {
if (configCopy.isUseLinuxNativeEpoll()) { if (configCopy.isUseLinuxNativeEpoll()) {
bossGroup = new EpollEventLoopGroup(configCopy.getBossThreads()); bossGroup = new EpollEventLoopGroup(configCopy.getBossThreads());
workerGroup = new EpollEventLoopGroup(configCopy.getWorkerThreads()); workerGroup = new EpollEventLoopGroup(configCopy.getWorkerThreads());
@ -190,30 +206,47 @@ public class SocketIOServer implements ClientListeners {
workerGroup = new NioEventLoopGroup(configCopy.getWorkerThreads()); workerGroup = new NioEventLoopGroup(configCopy.getWorkerThreads());
} }
} }
/** /**
* Stop server * Stop server
*/ */
public void stop() { public void stop() {
bossGroup.shutdownGracefully().syncUninterruptibly(); bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly(); workerGroup.shutdownGracefully().syncUninterruptibly();
pipelineFactory.stop(); pipelineFactory.stop();
log.info("SocketIO server stopped"); log.info("SocketIO server stopped");
} }
/**
* Stop server
* 不等待
*/
public void stopImmediately() {
bossGroup.shutdownGracefully(0L, 1L, TimeUnit.SECONDS).syncUninterruptibly();
workerGroup.shutdownGracefully(0L, 1L, TimeUnit.SECONDS).syncUninterruptibly();
pipelineFactory.stop();
log.info("SocketIO server stopped");
}
public SocketIONamespace addNamespace(String name) { public SocketIONamespace addNamespace(String name) {
return namespacesHub.create(name); return namespacesHub.create(name);
} }
public SocketIONamespace getNamespace(String name) { public SocketIONamespace getNamespace(String name) {
return namespacesHub.get(name); return namespacesHub.get(name);
} }
public void removeNamespace(String name) { public void removeNamespace(String name) {
namespacesHub.remove(name); namespacesHub.remove(name);
} }
/** /**
* Allows to get configuration provided * Allows to get configuration provided
* during server creation. Further changes on * during server creation. Further changes on
@ -222,48 +255,57 @@ public class SocketIOServer implements ClientListeners {
* @return Configuration object * @return Configuration object
*/ */
public Configuration getConfiguration() { public Configuration getConfiguration() {
return configuration; return configuration;
} }
@Override @Override
public void addMultiTypeEventListener(String eventName, MultiTypeEventListener listener, Class<?>... eventClass) { public void addMultiTypeEventListener(String eventName, MultiTypeEventListener listener, Class<?>... eventClass) {
mainNamespace.addMultiTypeEventListener(eventName, listener, eventClass); mainNamespace.addMultiTypeEventListener(eventName, listener, eventClass);
} }
@Override @Override
public <T> void addEventListener(String eventName, Class<T> eventClass, DataListener<T> listener) { public <T> void addEventListener(String eventName, Class<T> eventClass, DataListener<T> listener) {
mainNamespace.addEventListener(eventName, eventClass, listener); mainNamespace.addEventListener(eventName, eventClass, listener);
} }
@Override @Override
public void removeAllListeners(String eventName) { public void removeAllListeners(String eventName) {
mainNamespace.removeAllListeners(eventName); mainNamespace.removeAllListeners(eventName);
} }
@Override @Override
public void addDisconnectListener(DisconnectListener listener) { public void addDisconnectListener(DisconnectListener listener) {
mainNamespace.addDisconnectListener(listener); mainNamespace.addDisconnectListener(listener);
} }
@Override @Override
public void addConnectListener(ConnectListener listener) { public void addConnectListener(ConnectListener listener) {
mainNamespace.addConnectListener(listener); mainNamespace.addConnectListener(listener);
} }
@Override @Override
public void addPingListener(PingListener listener) { public void addPingListener(PingListener listener) {
mainNamespace.addPingListener(listener); mainNamespace.addPingListener(listener);
} }
@Override @Override
public void addListeners(Object listeners) { public void addListeners(Object listeners) {
mainNamespace.addListeners(listeners); mainNamespace.addListeners(listeners);
} }
@Override @Override
public void addListeners(Object listeners, Class<?> listenersClass) { public void addListeners(Object listeners, Class<?> listenersClass) {
mainNamespace.addListeners(listeners, listenersClass); mainNamespace.addListeners(listeners, listenersClass);
} }
} }

Loading…
Cancel
Save