From a791810fec54bd4f94966c88dfc145803f2a01e9 Mon Sep 17 00:00:00 2001 From: lidongy <1879087903@qq.com> Date: Mon, 22 Nov 2021 09:23:47 +0800 Subject: [PATCH] =?UTF-8?q?KERNEL-9700=20update:=20netty-socket.io?= =?UTF-8?q?=E6=A1=86=E6=9E=B6=E9=9C=80=E8=A6=81=E5=8D=87=E7=BA=A7=E5=88=B0?= =?UTF-8?q?=E6=9C=80=E6=96=B0=E7=9A=84=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/fr/third/socketio/AckCallback.java | 4 +- .../third/socketio/BroadcastOperations.java | 117 ++--------------- .../MultiRoomBroadcastOperations.java | 117 +++++++++++++++++ .../SingleRoomBroadcastOperations.java | 123 ++++++++++++++++++ .../socketio/SocketIOChannelInitializer.java | 6 +- .../com/fr/third/socketio/SocketIOServer.java | 33 +++-- .../com/fr/third/socketio/ack/AckManager.java | 6 +- .../socketio/handler/AuthorizeHandler.java | 12 +- .../fr/third/socketio/handler/ClientsBox.java | 3 +- .../socketio/handler/InPacketHandler.java | 2 +- .../third/socketio/listener/DataListener.java | 3 +- .../listener/DefaultExceptionListener.java | 3 +- .../socketio/listener/ExceptionListener.java | 3 +- .../listener/ExceptionListenerAdapter.java | 3 +- .../socketio/messages/PacketsMessage.java | 3 +- .../socketio/misc/CompositeIterable.java | 34 +---- .../socketio/misc/CompositeIterator.java | 55 ++++++++ .../third/socketio/namespace/Namespace.java | 10 +- .../socketio/namespace/NamespacesHub.java | 9 +- .../com/fr/third/socketio/protocol/Event.java | 2 +- .../socketio/protocol/JacksonJsonSupport.java | 4 +- .../third/socketio/protocol/JsonSupport.java | 3 +- .../fr/third/socketio/protocol/Packet.java | 25 ++++ .../socketio/protocol/PacketDecoder.java | 34 +++-- .../socketio/protocol/PacketEncoder.java | 3 +- .../socketio/protocol/UTF8CharsScanner.java | 24 ---- .../store/pubsub/BaseStoreFactory.java | 2 +- .../socketio/transport/BaseTransport.java | 25 ---- .../socketio/transport/NamespaceClient.java | 4 +- .../transport/WebSocketTransport.java | 1 + 30 files changed, 414 insertions(+), 259 deletions(-) create mode 100644 fine-socketio/src/main/java/com/fr/third/socketio/MultiRoomBroadcastOperations.java create mode 100644 fine-socketio/src/main/java/com/fr/third/socketio/SingleRoomBroadcastOperations.java create mode 100644 fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterator.java delete mode 100644 fine-socketio/src/main/java/com/fr/third/socketio/transport/BaseTransport.java diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/AckCallback.java b/fine-socketio/src/main/java/com/fr/third/socketio/AckCallback.java index ba7a5a6b9..800334c80 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/AckCallback.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/AckCallback.java @@ -31,8 +31,8 @@ package com.fr.third.socketio; * * @param - any serializable type * - * @see VoidAckCallback - * @see MultiTypeAckCallback + * @see com.fr.third.socketio.VoidAckCallback + * @see com.fr.third.socketio.MultiTypeAckCallback * */ public abstract class AckCallback { diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/BroadcastOperations.java b/fine-socketio/src/main/java/com/fr/third/socketio/BroadcastOperations.java index dd011d0bb..b308be864 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/BroadcastOperations.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/BroadcastOperations.java @@ -15,123 +15,24 @@ */ package com.fr.third.socketio; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import com.fr.third.socketio.misc.IterableCollection; -import com.fr.third.socketio.namespace.Namespace; import com.fr.third.socketio.protocol.Packet; -import com.fr.third.socketio.protocol.PacketType; -import com.fr.third.socketio.store.StoreFactory; -import com.fr.third.socketio.store.pubsub.DispatchMessage; -import com.fr.third.socketio.store.pubsub.PubSubType; + +import java.util.Collection; /** - * Fully thread-safe. + * broadcast interface * */ -public class BroadcastOperations implements ClientOperations { - - private final Iterable clients; - private final StoreFactory storeFactory; - - public BroadcastOperations(Iterable clients, StoreFactory storeFactory) { - super(); - this.clients = clients; - this.storeFactory = storeFactory; - } - - private void dispatch(Packet packet) { - Map> namespaceRooms = new HashMap>(); - for (SocketIOClient socketIOClient : clients) { - Namespace namespace = (Namespace)socketIOClient.getNamespace(); - Set rooms = namespace.getRooms(socketIOClient); - - Set roomsList = namespaceRooms.get(namespace.getName()); - if (roomsList == null) { - roomsList = new HashSet(); - namespaceRooms.put(namespace.getName(), roomsList); - } - roomsList.addAll(rooms); - } - for (Entry> entry : namespaceRooms.entrySet()) { - for (String room : entry.getValue()) { - storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey())); - } - } - } - - public Collection getClients() { - return new IterableCollection(clients); - } - - @Override - public void send(Packet packet) { - for (SocketIOClient client : clients) { - client.send(packet); - } - dispatch(packet); - } - - public void send(Packet packet, BroadcastAckCallback ackCallback) { - for (SocketIOClient client : clients) { - client.send(packet, ackCallback.createClientCallback(client)); - } - ackCallback.loopFinished(); - } +public interface BroadcastOperations extends ClientOperations { - @Override - public void disconnect() { - for (SocketIOClient client : clients) { - client.disconnect(); - } - } + Collection getClients(); - public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { - Packet packet = new Packet(PacketType.MESSAGE); - packet.setSubType(PacketType.EVENT); - packet.setName(name); - packet.setData(Arrays.asList(data)); + void send(Packet packet, BroadcastAckCallback ackCallback); - for (SocketIOClient client : clients) { - if (client.getSessionId().equals(excludedClient.getSessionId())) { - continue; - } - client.send(packet); - } - dispatch(packet); - } - - @Override - public void sendEvent(String name, Object... data) { - Packet packet = new Packet(PacketType.MESSAGE); - packet.setSubType(PacketType.EVENT); - packet.setName(name); - packet.setData(Arrays.asList(data)); - send(packet); - } + void sendEvent(String name, SocketIOClient excludedClient, Object... data); - public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { - for (SocketIOClient client : clients) { - client.sendEvent(name, ackCallback.createClientCallback(client), data); - } - ackCallback.loopFinished(); - } - - public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) { - for (SocketIOClient client : clients) { - if (client.getSessionId().equals(excludedClient.getSessionId())) { - continue; - } - client.sendEvent(name, ackCallback.createClientCallback(client), data); - } - ackCallback.loopFinished(); - } + void sendEvent(String name, Object data, BroadcastAckCallback ackCallback); + void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback); } diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/MultiRoomBroadcastOperations.java b/fine-socketio/src/main/java/com/fr/third/socketio/MultiRoomBroadcastOperations.java new file mode 100644 index 000000000..f31e73c09 --- /dev/null +++ b/fine-socketio/src/main/java/com/fr/third/socketio/MultiRoomBroadcastOperations.java @@ -0,0 +1,117 @@ +/** + * Copyright (c) 2012-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.fr.third.socketio; + +import com.fr.third.socketio.protocol.Packet; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * author: liangjiaqi + * date: 2020/8/8 6:02 PM + */ +public class MultiRoomBroadcastOperations implements BroadcastOperations { + + private Collection broadcastOperations; + + public MultiRoomBroadcastOperations(Collection broadcastOperations) { + this.broadcastOperations = broadcastOperations; + } + + @Override + public Collection getClients() { + Set clients = new HashSet(); + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return clients; + } + for( BroadcastOperations b : this.broadcastOperations ) { + clients.addAll( b.getClients() ); + } + return clients; + } + + @Override + public void send(Packet packet, BroadcastAckCallback ackCallback) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.send( packet, ackCallback ); + } + } + + @Override + public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, excludedClient, data ); + } + } + + @Override + public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, data, ackCallback ); + } + } + + @Override + public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, data, excludedClient, ackCallback ); + } + } + + @Override + public void send(Packet packet) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.send( packet ); + } + } + + @Override + public void disconnect() { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.disconnect(); + } + } + + @Override + public void sendEvent(String name, Object... data) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, data ); + } + } +} diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/SingleRoomBroadcastOperations.java b/fine-socketio/src/main/java/com/fr/third/socketio/SingleRoomBroadcastOperations.java new file mode 100644 index 000000000..317458e2f --- /dev/null +++ b/fine-socketio/src/main/java/com/fr/third/socketio/SingleRoomBroadcastOperations.java @@ -0,0 +1,123 @@ +/** + * Copyright (c) 2012-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.fr.third.socketio; + +import com.fr.third.socketio.misc.IterableCollection; +import com.fr.third.socketio.protocol.Packet; +import com.fr.third.socketio.protocol.PacketType; +import com.fr.third.socketio.store.StoreFactory; +import com.fr.third.socketio.store.pubsub.DispatchMessage; +import com.fr.third.socketio.store.pubsub.PubSubType; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Author: liangjiaqi + * Date: 2020/8/8 6:08 PM + */ +public class SingleRoomBroadcastOperations implements BroadcastOperations { + private final String namespace; + private final String room; + private final Iterable clients; + private final StoreFactory storeFactory; + + public SingleRoomBroadcastOperations(String namespace, String room, Iterable clients, StoreFactory storeFactory) { + super(); + this.namespace = namespace; + this.room = room; + this.clients = clients; + this.storeFactory = storeFactory; + } + + private void dispatch(Packet packet) { + this.storeFactory.pubSubStore().publish( + PubSubType.DISPATCH, + new DispatchMessage(this.room, packet, this.namespace)); + } + + @Override + public Collection getClients() { + return new IterableCollection(clients); + } + + @Override + public void send(Packet packet) { + for (SocketIOClient client : clients) { + client.send(packet); + } + dispatch(packet); + } + + @Override + public void send(Packet packet, BroadcastAckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.send(packet, ackCallback.createClientCallback(client)); + } + ackCallback.loopFinished(); + } + + @Override + public void disconnect() { + for (SocketIOClient client : clients) { + client.disconnect(); + } + } + + @Override + public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { + Packet packet = new Packet(PacketType.MESSAGE); + packet.setSubType(PacketType.EVENT); + packet.setName(name); + packet.setData(Arrays.asList(data)); + + for (SocketIOClient client : clients) { + if (client.getSessionId().equals(excludedClient.getSessionId())) { + continue; + } + client.send(packet); + } + dispatch(packet); + } + + @Override + public void sendEvent(String name, Object... data) { + Packet packet = new Packet(PacketType.MESSAGE); + packet.setSubType(PacketType.EVENT); + packet.setName(name); + packet.setData(Arrays.asList(data)); + send(packet); + } + + @Override + public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.sendEvent(name, ackCallback.createClientCallback(client), data); + } + ackCallback.loopFinished(); + } + + @Override + public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) { + for (SocketIOClient client : clients) { + if (client.getSessionId().equals(excludedClient.getSessionId())) { + continue; + } + client.sendEvent(name, ackCallback.createClientCallback(client), data); + } + ackCallback.loopFinished(); + } +} diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/SocketIOChannelInitializer.java b/fine-socketio/src/main/java/com/fr/third/socketio/SocketIOChannelInitializer.java index ae8133599..068adf330 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/SocketIOChannelInitializer.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/SocketIOChannelInitializer.java @@ -24,9 +24,6 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import com.fr.stable.ArrayUtils; -import com.fr.third.socketio.scheduler.HashedWheelTimeoutScheduler; -import com.fr.third.socketio.transport.PollingTransport; -import com.fr.third.socketio.transport.WebSocketTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,9 +40,12 @@ import com.fr.third.socketio.protocol.JsonSupport; import com.fr.third.socketio.protocol.PacketDecoder; import com.fr.third.socketio.protocol.PacketEncoder; import com.fr.third.socketio.scheduler.CancelableScheduler; +import com.fr.third.socketio.scheduler.HashedWheelTimeoutScheduler; import com.fr.third.socketio.store.StoreFactory; import com.fr.third.socketio.store.pubsub.DisconnectMessage; import com.fr.third.socketio.store.pubsub.PubSubType; +import com.fr.third.socketio.transport.PollingTransport; +import com.fr.third.socketio.transport.WebSocketTransport; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/SocketIOServer.java b/fine-socketio/src/main/java/com/fr/third/socketio/SocketIOServer.java index 3d5796b2d..28127d7de 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/SocketIOServer.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/SocketIOServer.java @@ -15,12 +15,7 @@ */ package com.fr.third.socketio; -import com.fr.third.socketio.listener.ClientListeners; -import com.fr.third.socketio.listener.ConnectListener; -import com.fr.third.socketio.listener.DataListener; -import com.fr.third.socketio.listener.DisconnectListener; -import com.fr.third.socketio.listener.MultiTypeEventListener; -import com.fr.third.socketio.listener.PingListener; +import com.fr.third.socketio.listener.*; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -34,7 +29,9 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -104,7 +101,16 @@ public class SocketIOServer implements ClientListeners { } public BroadcastOperations getBroadcastOperations() { - return new BroadcastOperations(getAllClients(), configCopy.getStoreFactory()); + Collection namespaces = namespacesHub.getAllNamespaces(); + List list = new ArrayList(); + BroadcastOperations broadcast = null; + if( namespaces != null && namespaces.size() > 0 ) { + for( SocketIONamespace n : namespaces ) { + broadcast = n.getBroadcastOperations(); + list.add( broadcast ); + } + } + return new MultiRoomBroadcastOperations( list ); } /** @@ -115,8 +121,16 @@ public class SocketIOServer implements ClientListeners { * @return broadcast operations */ public BroadcastOperations getRoomOperations(String room) { - Iterable clients = namespacesHub.getRoomClients(room); - return new BroadcastOperations(clients, configCopy.getStoreFactory()); + Collection namespaces = namespacesHub.getAllNamespaces(); + List list = new ArrayList(); + BroadcastOperations broadcast = null; + if( namespaces != null && namespaces.size() > 0 ) { + for( SocketIONamespace n : namespaces ) { + broadcast = n.getRoomOperations( room ); + list.add( broadcast ); + } + } + return new MultiRoomBroadcastOperations( list ); } /** @@ -291,4 +305,3 @@ public class SocketIOServer implements ClientListeners { } - diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/ack/AckManager.java b/fine-socketio/src/main/java/com/fr/third/socketio/ack/AckManager.java index b941fc962..3016fb91d 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/ack/AckManager.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/ack/AckManager.java @@ -15,16 +15,12 @@ */ package com.fr.third.socketio.ack; +import com.fr.third.socketio.*; import com.fr.third.socketio.handler.ClientHead; import com.fr.third.socketio.protocol.Packet; import com.fr.third.socketio.scheduler.CancelableScheduler; import com.fr.third.socketio.scheduler.SchedulerKey; import com.fr.third.socketio.scheduler.SchedulerKey.Type; -import com.fr.third.socketio.AckCallback; -import com.fr.third.socketio.Disconnectable; -import com.fr.third.socketio.MultiTypeAckCallback; -import com.fr.third.socketio.MultiTypeArgs; -import com.fr.third.socketio.SocketIOClient; import io.netty.util.internal.PlatformDependent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/handler/AuthorizeHandler.java b/fine-socketio/src/main/java/com/fr/third/socketio/handler/AuthorizeHandler.java index db772781a..b7d29bf7f 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/handler/AuthorizeHandler.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/handler/AuthorizeHandler.java @@ -26,6 +26,9 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fr.third.socketio.Configuration; import com.fr.third.socketio.Disconnectable; import com.fr.third.socketio.DisconnectableHub; @@ -33,12 +36,6 @@ import com.fr.third.socketio.HandshakeData; import com.fr.third.socketio.SocketIOClient; import com.fr.third.socketio.Transport; import com.fr.third.socketio.ack.AckManager; -import com.fr.third.socketio.store.StoreFactory; -import com.fr.third.socketio.store.pubsub.ConnectMessage; -import com.fr.third.socketio.store.pubsub.PubSubType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fr.third.socketio.messages.HttpErrorMessage; import com.fr.third.socketio.namespace.Namespace; import com.fr.third.socketio.namespace.NamespacesHub; @@ -48,6 +45,9 @@ import com.fr.third.socketio.protocol.PacketType; import com.fr.third.socketio.scheduler.CancelableScheduler; import com.fr.third.socketio.scheduler.SchedulerKey; import com.fr.third.socketio.scheduler.SchedulerKey.Type; +import com.fr.third.socketio.store.StoreFactory; +import com.fr.third.socketio.store.pubsub.ConnectMessage; +import com.fr.third.socketio.store.pubsub.PubSubType; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/handler/ClientsBox.java b/fine-socketio/src/main/java/com/fr/third/socketio/handler/ClientsBox.java index e7c2a718b..d45667fb4 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/handler/ClientsBox.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/handler/ClientsBox.java @@ -15,13 +15,14 @@ */ package com.fr.third.socketio.handler; -import com.fr.third.socketio.HandshakeData; import io.netty.channel.Channel; import io.netty.util.internal.PlatformDependent; import java.util.Map; import java.util.UUID; +import com.fr.third.socketio.HandshakeData; + public class ClientsBox { private final Map uuid2clients = PlatformDependent.newConcurrentHashMap(); diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/handler/InPacketHandler.java b/fine-socketio/src/main/java/com/fr/third/socketio/handler/InPacketHandler.java index b213fec2f..3e5bf9850 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/handler/InPacketHandler.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/handler/InPacketHandler.java @@ -15,7 +15,6 @@ */ package com.fr.third.socketio.handler; -import com.fr.third.socketio.transport.NamespaceClient; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -32,6 +31,7 @@ import com.fr.third.socketio.namespace.NamespacesHub; import com.fr.third.socketio.protocol.Packet; import com.fr.third.socketio.protocol.PacketDecoder; import com.fr.third.socketio.protocol.PacketType; +import com.fr.third.socketio.transport.NamespaceClient; @Sharable public class InPacketHandler extends SimpleChannelInboundHandler { diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/listener/DataListener.java b/fine-socketio/src/main/java/com/fr/third/socketio/listener/DataListener.java index ebe81719a..cd5b6e2a9 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/listener/DataListener.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/listener/DataListener.java @@ -26,8 +26,7 @@ public interface DataListener { * @param client - receiver * @param data - received object * @param ackSender - ack request - * - * @throws Exception + * */ void onData(SocketIOClient client, T data, AckRequest ackSender) throws Exception; diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/listener/DefaultExceptionListener.java b/fine-socketio/src/main/java/com/fr/third/socketio/listener/DefaultExceptionListener.java index a9dbad9d2..829a56c6c 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/listener/DefaultExceptionListener.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/listener/DefaultExceptionListener.java @@ -15,7 +15,6 @@ */ package com.fr.third.socketio.listener; -import com.fr.third.socketio.SocketIOClient; import io.netty.channel.ChannelHandlerContext; import java.util.List; @@ -23,6 +22,8 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fr.third.socketio.SocketIOClient; + public class DefaultExceptionListener extends ExceptionListenerAdapter { private static final Logger log = LoggerFactory.getLogger(DefaultExceptionListener.class); diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListener.java b/fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListener.java index 13d813560..f0e65a459 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListener.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListener.java @@ -15,11 +15,12 @@ */ package com.fr.third.socketio.listener; -import com.fr.third.socketio.SocketIOClient; import io.netty.channel.ChannelHandlerContext; import java.util.List; +import com.fr.third.socketio.SocketIOClient; + public interface ExceptionListener { void onEventException(Exception e, List args, SocketIOClient client); diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListenerAdapter.java b/fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListenerAdapter.java index 1a12c9a41..dbf07edf7 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListenerAdapter.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListenerAdapter.java @@ -15,11 +15,12 @@ */ package com.fr.third.socketio.listener; -import com.fr.third.socketio.SocketIOClient; import io.netty.channel.ChannelHandlerContext; import java.util.List; +import com.fr.third.socketio.SocketIOClient; + /** * Base callback exceptions listener * diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/messages/PacketsMessage.java b/fine-socketio/src/main/java/com/fr/third/socketio/messages/PacketsMessage.java index 682045901..7a98e6211 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/messages/PacketsMessage.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/messages/PacketsMessage.java @@ -15,9 +15,10 @@ */ package com.fr.third.socketio.messages; +import io.netty.buffer.ByteBuf; + import com.fr.third.socketio.Transport; import com.fr.third.socketio.handler.ClientHead; -import io.netty.buffer.ByteBuf; public class PacketsMessage { diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterable.java b/fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterable.java index 251f4e2cf..76fedc7a3 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterable.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterable.java @@ -19,14 +19,11 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -public class CompositeIterable implements Iterable, Iterator { +public class CompositeIterable implements Iterable { private List> iterablesList; private Iterable[] iterables; - private Iterator> listIterator; - private Iterator currentIterator; - public CompositeIterable(List> iterables) { this.iterablesList = iterables; } @@ -52,35 +49,8 @@ public class CompositeIterable implements Iterable, Iterator { iterators.add(iterable.iterator()); } } - listIterator = iterators.iterator(); - currentIterator = null; - return this; - } - - @Override - public boolean hasNext() { - if (currentIterator == null || !currentIterator.hasNext()) { - while (listIterator.hasNext()) { - Iterator iterator = listIterator.next(); - if (iterator.hasNext()) { - currentIterator = iterator; - return true; - } - } - return false; - } - return currentIterator.hasNext(); + return new CompositeIterator(iterators.iterator()); } - @Override - public T next() { - hasNext(); - return currentIterator.next(); - } - - @Override - public void remove() { - currentIterator.remove(); - } } diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterator.java b/fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterator.java new file mode 100644 index 000000000..f891c2670 --- /dev/null +++ b/fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterator.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2012-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.fr.third.socketio.misc; + +import java.util.Iterator; + +public class CompositeIterator implements Iterator { + + private Iterator> listIterator; + private Iterator currentIterator; + + public CompositeIterator(Iterator> listIterator) { + this.currentIterator = null; + this.listIterator = listIterator; + } + + @Override + public boolean hasNext() { + if (currentIterator == null || !currentIterator.hasNext()) { + while (listIterator.hasNext()) { + Iterator iterator = listIterator.next(); + if (iterator.hasNext()) { + currentIterator = iterator; + return true; + } + } + return false; + } + return currentIterator.hasNext(); + } + + @Override + public T next() { + hasNext(); + return currentIterator.next(); + } + + @Override + public void remove() { + currentIterator.remove(); + } +} diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/namespace/Namespace.java b/fine-socketio/src/main/java/com/fr/third/socketio/namespace/Namespace.java index a947f6550..a8fa9b5f0 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/namespace/Namespace.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/namespace/Namespace.java @@ -31,6 +31,7 @@ import com.fr.third.socketio.AckRequest; import com.fr.third.socketio.BroadcastOperations; import com.fr.third.socketio.Configuration; import com.fr.third.socketio.MultiTypeArgs; +import com.fr.third.socketio.SingleRoomBroadcastOperations; import com.fr.third.socketio.SocketIOClient; import com.fr.third.socketio.SocketIONamespace; import com.fr.third.socketio.annotation.ScannerEngine; @@ -206,11 +207,10 @@ public class Namespace implements SocketIONamespace { Set joinedRooms = client.getAllRooms(); allClients.remove(client.getSessionId()); - leave(getName(), client.getSessionId()); - storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); - + // client must leave all rooms and publish the leave msg one by one on disconnect. for (String joinedRoom : joinedRooms) { leave(roomClients, joinedRoom, client.getSessionId()); + storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), joinedRoom, getName())); } clientRooms.remove(client.getSessionId()); @@ -285,12 +285,12 @@ public class Namespace implements SocketIONamespace { @Override public BroadcastOperations getBroadcastOperations() { - return new BroadcastOperations(allClients.values(), storeFactory); + return new SingleRoomBroadcastOperations(getName(), getName(), allClients.values(), storeFactory); } @Override public BroadcastOperations getRoomOperations(String room) { - return new BroadcastOperations(getRoomClients(room), storeFactory); + return new SingleRoomBroadcastOperations(getName(), room, getRoomClients(room), storeFactory); } @Override diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/namespace/NamespacesHub.java b/fine-socketio/src/main/java/com/fr/third/socketio/namespace/NamespacesHub.java index fbb0ff2b5..6ba5b5ed3 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/namespace/NamespacesHub.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/namespace/NamespacesHub.java @@ -15,10 +15,6 @@ */ package com.fr.third.socketio.namespace; -import com.fr.third.socketio.Configuration; -import com.fr.third.socketio.SocketIOClient; -import com.fr.third.socketio.SocketIONamespace; -import com.fr.third.socketio.misc.CompositeIterable; import io.netty.util.internal.PlatformDependent; import java.util.ArrayList; @@ -26,6 +22,11 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentMap; +import com.fr.third.socketio.Configuration; +import com.fr.third.socketio.SocketIOClient; +import com.fr.third.socketio.SocketIONamespace; +import com.fr.third.socketio.misc.CompositeIterable; + public class NamespacesHub { private final ConcurrentMap namespaces = PlatformDependent.newConcurrentHashMap(); diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/Event.java b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/Event.java index 942e65410..7a2ba2adf 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/Event.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/Event.java @@ -17,7 +17,7 @@ package com.fr.third.socketio.protocol; import java.util.List; -class Event { +public class Event { private String name; private List args; diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/JacksonJsonSupport.java b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/JacksonJsonSupport.java index fa2779be3..823f62a1b 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/JacksonJsonSupport.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/JacksonJsonSupport.java @@ -28,11 +28,11 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; -import com.fr.third.socketio.AckCallback; -import com.fr.third.socketio.MultiTypeAckCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fr.third.socketio.AckCallback; +import com.fr.third.socketio.MultiTypeAckCallback; import com.fr.third.socketio.namespace.Namespace; import com.fr.third.fasterxml.jackson.annotation.JsonInclude.Include; import com.fr.third.fasterxml.jackson.core.JsonGenerationException; diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/JsonSupport.java b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/JsonSupport.java index 31b896bc9..bc72326bc 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/JsonSupport.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/JsonSupport.java @@ -15,13 +15,14 @@ */ package com.fr.third.socketio.protocol; -import com.fr.third.socketio.AckCallback; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import java.io.IOException; import java.util.List; +import com.fr.third.socketio.AckCallback; + /** * JSON infrastructure interface. * Allows to implement custom realizations diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/Packet.java b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/Packet.java index 150632971..0d1314cf6 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/Packet.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/Packet.java @@ -77,6 +77,31 @@ public class Packet implements Serializable { return (T)data; } + /** + * Creates a copy of #{@link Packet} with new namespace set + * if it differs from current namespace. + * Otherwise, returns original object unchanged + * + * @param namespace + * @return packet + */ + public Packet withNsp(String namespace) { + if (this.nsp.equalsIgnoreCase(namespace)) { + return this; + } else { + Packet newPacket = new Packet(this.type); + newPacket.setAckId(this.ackId); + newPacket.setData(this.data); + newPacket.setDataSource(this.dataSource); + newPacket.setName(this.name); + newPacket.setSubType(this.subType); + newPacket.setNsp(namespace); + newPacket.attachments = this.attachments; + newPacket.attachmentsCount = this.attachmentsCount; + return newPacket; + } + } + public void setNsp(String endpoint) { this.nsp = endpoint; } diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketDecoder.java b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketDecoder.java index 7373d5b41..36e7df43a 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketDecoder.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketDecoder.java @@ -20,6 +20,7 @@ import com.fr.third.socketio.ack.AckManager; import com.fr.third.socketio.handler.ClientHead; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.handler.codec.base64.Base64; import io.netty.util.CharsetUtil; @@ -91,16 +92,6 @@ public class PacketDecoder { return PacketType.valueOfInner(typeId); } - @Deprecated - public Packet decodePacket(String string, UUID uuid) throws IOException { - ByteBuf buf = Unpooled.copiedBuffer(string, CharsetUtil.UTF_8); - try { - return null; - } finally { - buf.release(); - } - } - private boolean hasLengthHeader(ByteBuf buffer) { for (int i = 0; i < Math.min(buffer.readableBytes(), 10); i++) { byte b = buffer.getByte(buffer.readerIndex() + i); @@ -309,22 +300,27 @@ public class PacketDecoder { } } } - - private String readNamespace(ByteBuf frame){ + + private String readNamespace(ByteBuf frame) { + /** * namespace post request with url queryString, like * /message?a=1, * /message, */ - int endIndex = frame.bytesBefore((byte)'?'); - if(endIndex > 0){ - return readString(frame,endIndex); + ByteBuf buffer = frame.slice(); + // skip this frame + frame.readerIndex(frame.readerIndex() + frame.readableBytes()); + + int endIndex = buffer.bytesBefore((byte) '?'); + if (endIndex > 0) { + return readString(buffer, endIndex); } - endIndex = frame.bytesBefore((byte)','); - if(endIndex > 0){ - return readString(frame,endIndex); + endIndex = buffer.bytesBefore((byte) ','); + if (endIndex > 0) { + return readString(buffer, endIndex); } - return readString(frame); + return readString(buffer); } } diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketEncoder.java b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketEncoder.java index 6228c1b57..abdfe439c 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketEncoder.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketEncoder.java @@ -15,7 +15,6 @@ */ package com.fr.third.socketio.protocol; -import com.fr.third.socketio.Configuration; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufOutputStream; @@ -29,6 +28,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; +import com.fr.third.socketio.Configuration; + public class PacketEncoder { private static final byte[] BINARY_HEADER = "b4".getBytes(CharsetUtil.UTF_8); diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/UTF8CharsScanner.java b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/UTF8CharsScanner.java index d11ff1799..4e8728217 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/protocol/UTF8CharsScanner.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/protocol/UTF8CharsScanner.java @@ -86,15 +86,6 @@ public class UTF8CharsScanner { return i; } - public int getLength(ByteBuf inputBuffer, int start) { - int len = 0; - for (int i = start; i < inputBuffer.writerIndex();) { - i = getCharTailIndex(inputBuffer, i); - len++; - } - return len; - } - public int getActualLength(ByteBuf inputBuffer, int length) { int len = 0; int start = inputBuffer.readerIndex(); @@ -108,19 +99,4 @@ public class UTF8CharsScanner { throw new IllegalStateException(); } - - public int findTailIndex(ByteBuf inputBuffer, int start, int end, - int charsToRead) { - int len = 0; - int i = start; - while (i < end) { - i = getCharTailIndex(inputBuffer, i); - len++; - if (charsToRead == len) { - break; - } - } - return i; - } - } diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/store/pubsub/BaseStoreFactory.java b/fine-socketio/src/main/java/com/fr/third/socketio/store/pubsub/BaseStoreFactory.java index d578349f3..8a03bf09b 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/store/pubsub/BaseStoreFactory.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/store/pubsub/BaseStoreFactory.java @@ -15,7 +15,6 @@ */ package com.fr.third.socketio.store.pubsub; -import com.fr.third.socketio.store.StoreFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +22,7 @@ import com.fr.third.socketio.handler.AuthorizeHandler; import com.fr.third.socketio.handler.ClientHead; import com.fr.third.socketio.namespace.NamespacesHub; import com.fr.third.socketio.protocol.JsonSupport; +import com.fr.third.socketio.store.StoreFactory; public abstract class BaseStoreFactory implements StoreFactory { diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/transport/BaseTransport.java b/fine-socketio/src/main/java/com/fr/third/socketio/transport/BaseTransport.java deleted file mode 100644 index 15ee58fd5..000000000 --- a/fine-socketio/src/main/java/com/fr/third/socketio/transport/BaseTransport.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (c) 2012-2019 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.fr.third.socketio.transport; - -import io.netty.channel.ChannelInboundHandlerAdapter; - -import com.fr.third.socketio.Disconnectable; - -@Deprecated -public abstract class BaseTransport extends ChannelInboundHandlerAdapter implements Disconnectable { - -} diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/transport/NamespaceClient.java b/fine-socketio/src/main/java/com/fr/third/socketio/transport/NamespaceClient.java index cf564fe4d..85076d95a 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/transport/NamespaceClient.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/transport/NamespaceClient.java @@ -104,8 +104,8 @@ public class NamespaceClient implements SocketIOClient { if (!isConnected()) { return; } - packet.setNsp(namespace.getName()); - baseClient.send(packet); + + baseClient.send(packet.withNsp(namespace.getName())); } public void onDisconnect() { diff --git a/fine-socketio/src/main/java/com/fr/third/socketio/transport/WebSocketTransport.java b/fine-socketio/src/main/java/com/fr/third/socketio/transport/WebSocketTransport.java index 137cb15b3..c9da7f0fa 100644 --- a/fine-socketio/src/main/java/com/fr/third/socketio/transport/WebSocketTransport.java +++ b/fine-socketio/src/main/java/com/fr/third/socketio/transport/WebSocketTransport.java @@ -51,6 +51,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; +import io.netty.util.ReferenceCountUtil; @Sharable public class WebSocketTransport extends ChannelInboundHandlerAdapter {