diff --git a/fine-socketio/src/com/fr/third/socketio/SocketIOServer.java b/fine-socketio/src/com/fr/third/socketio/SocketIOServer.java index 9c9cd5e1a..329448eb0 100644 --- a/fine-socketio/src/com/fr/third/socketio/SocketIOServer.java +++ b/fine-socketio/src/com/fr/third/socketio/SocketIOServer.java @@ -1,12 +1,12 @@ /** * Copyright 2012 Nikita Koksharov - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -37,6 +37,7 @@ import io.netty.util.concurrent.FutureListener; import java.net.InetSocketAddress; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,43 +47,48 @@ import com.fr.third.socketio.namespace.NamespacesHub; /** * Fully thread-safe. - * */ public class SocketIOServer implements ClientListeners { - + private static final Logger log = LoggerFactory.getLogger(SocketIOServer.class); - + private final Configuration configCopy; + private final Configuration configuration; - + private final NamespacesHub namespacesHub; + private final SocketIONamespace mainNamespace; - + private SocketIOChannelInitializer pipelineFactory = new SocketIOChannelInitializer(); - + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; - + public SocketIOServer(Configuration configuration) { + this.configuration = configuration; this.configCopy = new Configuration(configuration); namespacesHub = new NamespacesHub(configCopy); mainNamespace = addNamespace(Namespace.DEFAULT_NAME); } - + public void setPipelineFactory(SocketIOChannelInitializer pipelineFactory) { + this.pipelineFactory = pipelineFactory; } - + /** * Get all clients connected to default namespace * * @return clients collection */ public Collection getAllClients() { + return namespacesHub.get(Namespace.DEFAULT_NAME).getAllClients(); } - + /** * Get client by uuid from default namespace * @@ -90,22 +96,25 @@ public class SocketIOServer implements ClientListeners { * @return client */ public SocketIOClient getClient(UUID uuid) { + return namespacesHub.get(Namespace.DEFAULT_NAME).getClient(uuid); } - + /** * Get all namespaces * * @return namespaces collection */ public Collection getAllNamespaces() { + return namespacesHub.getAllNamespaces(); } - + public BroadcastOperations getBroadcastOperations() { + return new BroadcastOperations(getAllClients(), configCopy.getStoreFactory()); } - + /** * Get broadcast operations for clients within * room by room name @@ -114,47 +123,52 @@ public class SocketIOServer implements ClientListeners { * @return broadcast operations */ public BroadcastOperations getRoomOperations(String room) { + Iterable clients = namespacesHub.getRoomClients(room); return new BroadcastOperations(clients, configCopy.getStoreFactory()); } - + /** * Start server */ public void start() { + startAsync().syncUninterruptibly(); } - + /** * Start server asynchronously - * + * * @return void */ public Future startAsync() { + log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory()); initGroups(); - + pipelineFactory.start(configCopy, namespacesHub); - + Class channelClass = NioServerSocketChannel.class; if (configCopy.isUseLinuxNativeEpoll()) { channelClass = EpollServerSocketChannel.class; } - + ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) - .channel(channelClass) - .childHandler(pipelineFactory); + .channel(channelClass) + .childHandler(pipelineFactory); applyConnectionOptions(b); - + InetSocketAddress addr = new InetSocketAddress(configCopy.getPort()); if (configCopy.getHostname() != null) { addr = new InetSocketAddress(configCopy.getHostname(), configCopy.getPort()); } - + return b.bind(addr).addListener(new FutureListener() { + @Override public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { log.info("SocketIO server started at port: {}", configCopy.getPort()); } else { @@ -163,8 +177,9 @@ public class SocketIOServer implements ClientListeners { } }); } - + protected void applyConnectionOptions(ServerBootstrap bootstrap) { + SocketConfig config = configCopy.getSocketConfig(); bootstrap.childOption(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()); if (config.getTcpSendBufferSize() != -1) { @@ -176,12 +191,13 @@ public class SocketIOServer implements ClientListeners { } bootstrap.childOption(ChannelOption.SO_KEEPALIVE, config.isTcpKeepAlive()); bootstrap.childOption(ChannelOption.SO_LINGER, config.getSoLinger()); - + bootstrap.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress()); bootstrap.option(ChannelOption.SO_BACKLOG, config.getAcceptBackLog()); } - + protected void initGroups() { + if (configCopy.isUseLinuxNativeEpoll()) { bossGroup = new EpollEventLoopGroup(configCopy.getBossThreads()); workerGroup = new EpollEventLoopGroup(configCopy.getWorkerThreads()); @@ -190,30 +206,47 @@ public class SocketIOServer implements ClientListeners { workerGroup = new NioEventLoopGroup(configCopy.getWorkerThreads()); } } - + /** * Stop server */ public void stop() { + bossGroup.shutdownGracefully().syncUninterruptibly(); workerGroup.shutdownGracefully().syncUninterruptibly(); - + pipelineFactory.stop(); log.info("SocketIO server stopped"); } - + + /** + * Stop server + * 不等待 + */ + public void stopImmediately() { + + bossGroup.shutdownGracefully(0L, 1L, TimeUnit.SECONDS).syncUninterruptibly(); + workerGroup.shutdownGracefully(0L, 1L, TimeUnit.SECONDS).syncUninterruptibly(); + + pipelineFactory.stop(); + log.info("SocketIO server stopped"); + } + public SocketIONamespace addNamespace(String name) { + return namespacesHub.create(name); } - + public SocketIONamespace getNamespace(String name) { + return namespacesHub.get(name); } - + public void removeNamespace(String name) { + namespacesHub.remove(name); } - + /** * Allows to get configuration provided * during server creation. Further changes on @@ -222,48 +255,57 @@ public class SocketIOServer implements ClientListeners { * @return Configuration object */ public Configuration getConfiguration() { + return configuration; } - + @Override public void addMultiTypeEventListener(String eventName, MultiTypeEventListener listener, Class... eventClass) { + mainNamespace.addMultiTypeEventListener(eventName, listener, eventClass); } - + @Override public void addEventListener(String eventName, Class eventClass, DataListener listener) { + mainNamespace.addEventListener(eventName, eventClass, listener); } - + @Override public void removeAllListeners(String eventName) { + mainNamespace.removeAllListeners(eventName); } - + @Override public void addDisconnectListener(DisconnectListener listener) { + mainNamespace.addDisconnectListener(listener); } - + @Override public void addConnectListener(ConnectListener listener) { + mainNamespace.addConnectListener(listener); } - + @Override public void addPingListener(PingListener listener) { + mainNamespace.addPingListener(listener); } - + @Override public void addListeners(Object listeners) { + mainNamespace.addListeners(listeners); } @Override public void addListeners(Object listeners, Class listenersClass) { + mainNamespace.addListeners(listeners, listenersClass); } - - + + }