Browse Source

Pull request #4102: KERNEL-9700 update: netty-socket.io框架需要升级到最新的版本

Merge in CORE/base-third from ~LIDONGY/base-third:feature/x to feature/x

* commit '6e5b5ef1dfbd5f02108ce8323e82cab8096f69d7':
  KERNEL-9700 update: netty-socket.io框架需要升级到最新的版本
feature/x
lidongy 3 years ago
parent
commit
23ab285be5
  1. 4
      fine-socketio/src/main/java/com/fr/third/socketio/AckCallback.java
  2. 117
      fine-socketio/src/main/java/com/fr/third/socketio/BroadcastOperations.java
  3. 117
      fine-socketio/src/main/java/com/fr/third/socketio/MultiRoomBroadcastOperations.java
  4. 123
      fine-socketio/src/main/java/com/fr/third/socketio/SingleRoomBroadcastOperations.java
  5. 6
      fine-socketio/src/main/java/com/fr/third/socketio/SocketIOChannelInitializer.java
  6. 33
      fine-socketio/src/main/java/com/fr/third/socketio/SocketIOServer.java
  7. 6
      fine-socketio/src/main/java/com/fr/third/socketio/ack/AckManager.java
  8. 12
      fine-socketio/src/main/java/com/fr/third/socketio/handler/AuthorizeHandler.java
  9. 3
      fine-socketio/src/main/java/com/fr/third/socketio/handler/ClientsBox.java
  10. 2
      fine-socketio/src/main/java/com/fr/third/socketio/handler/InPacketHandler.java
  11. 3
      fine-socketio/src/main/java/com/fr/third/socketio/listener/DataListener.java
  12. 3
      fine-socketio/src/main/java/com/fr/third/socketio/listener/DefaultExceptionListener.java
  13. 3
      fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListener.java
  14. 3
      fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListenerAdapter.java
  15. 3
      fine-socketio/src/main/java/com/fr/third/socketio/messages/PacketsMessage.java
  16. 34
      fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterable.java
  17. 55
      fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterator.java
  18. 10
      fine-socketio/src/main/java/com/fr/third/socketio/namespace/Namespace.java
  19. 9
      fine-socketio/src/main/java/com/fr/third/socketio/namespace/NamespacesHub.java
  20. 2
      fine-socketio/src/main/java/com/fr/third/socketio/protocol/Event.java
  21. 4
      fine-socketio/src/main/java/com/fr/third/socketio/protocol/JacksonJsonSupport.java
  22. 3
      fine-socketio/src/main/java/com/fr/third/socketio/protocol/JsonSupport.java
  23. 25
      fine-socketio/src/main/java/com/fr/third/socketio/protocol/Packet.java
  24. 34
      fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketDecoder.java
  25. 3
      fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketEncoder.java
  26. 24
      fine-socketio/src/main/java/com/fr/third/socketio/protocol/UTF8CharsScanner.java
  27. 2
      fine-socketio/src/main/java/com/fr/third/socketio/store/pubsub/BaseStoreFactory.java
  28. 25
      fine-socketio/src/main/java/com/fr/third/socketio/transport/BaseTransport.java
  29. 4
      fine-socketio/src/main/java/com/fr/third/socketio/transport/NamespaceClient.java
  30. 1
      fine-socketio/src/main/java/com/fr/third/socketio/transport/WebSocketTransport.java

4
fine-socketio/src/main/java/com/fr/third/socketio/AckCallback.java

@ -31,8 +31,8 @@ package com.fr.third.socketio;
*
* @param <T> - any serializable type
*
* @see VoidAckCallback
* @see MultiTypeAckCallback
* @see com.fr.third.socketio.VoidAckCallback
* @see com.fr.third.socketio.MultiTypeAckCallback
*
*/
public abstract class AckCallback<T> {

117
fine-socketio/src/main/java/com/fr/third/socketio/BroadcastOperations.java

@ -15,123 +15,24 @@
*/
package com.fr.third.socketio;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import com.fr.third.socketio.misc.IterableCollection;
import com.fr.third.socketio.namespace.Namespace;
import com.fr.third.socketio.protocol.Packet;
import com.fr.third.socketio.protocol.PacketType;
import com.fr.third.socketio.store.StoreFactory;
import com.fr.third.socketio.store.pubsub.DispatchMessage;
import com.fr.third.socketio.store.pubsub.PubSubType;
import java.util.Collection;
/**
* Fully thread-safe.
* broadcast interface
*
*/
public class BroadcastOperations implements ClientOperations {
private final Iterable<SocketIOClient> clients;
private final StoreFactory storeFactory;
public BroadcastOperations(Iterable<SocketIOClient> clients, StoreFactory storeFactory) {
super();
this.clients = clients;
this.storeFactory = storeFactory;
}
private void dispatch(Packet packet) {
Map<String, Set<String>> namespaceRooms = new HashMap<String, Set<String>>();
for (SocketIOClient socketIOClient : clients) {
Namespace namespace = (Namespace)socketIOClient.getNamespace();
Set<String> rooms = namespace.getRooms(socketIOClient);
Set<String> roomsList = namespaceRooms.get(namespace.getName());
if (roomsList == null) {
roomsList = new HashSet<String>();
namespaceRooms.put(namespace.getName(), roomsList);
}
roomsList.addAll(rooms);
}
for (Entry<String, Set<String>> entry : namespaceRooms.entrySet()) {
for (String room : entry.getValue()) {
storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
}
}
}
public Collection<SocketIOClient> getClients() {
return new IterableCollection<SocketIOClient>(clients);
}
@Override
public void send(Packet packet) {
for (SocketIOClient client : clients) {
client.send(packet);
}
dispatch(packet);
}
public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
client.send(packet, ackCallback.createClientCallback(client));
}
ackCallback.loopFinished();
}
public interface BroadcastOperations extends ClientOperations {
@Override
public void disconnect() {
for (SocketIOClient client : clients) {
client.disconnect();
}
}
Collection<SocketIOClient> getClients();
public void sendEvent(String name, SocketIOClient excludedClient, Object... data) {
Packet packet = new Packet(PacketType.MESSAGE);
packet.setSubType(PacketType.EVENT);
packet.setName(name);
packet.setData(Arrays.asList(data));
<T> void send(Packet packet, BroadcastAckCallback<T> ackCallback);
for (SocketIOClient client : clients) {
if (client.getSessionId().equals(excludedClient.getSessionId())) {
continue;
}
client.send(packet);
}
dispatch(packet);
}
@Override
public void sendEvent(String name, Object... data) {
Packet packet = new Packet(PacketType.MESSAGE);
packet.setSubType(PacketType.EVENT);
packet.setName(name);
packet.setData(Arrays.asList(data));
send(packet);
}
void sendEvent(String name, SocketIOClient excludedClient, Object... data);
public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
client.sendEvent(name, ackCallback.createClientCallback(client), data);
}
ackCallback.loopFinished();
}
public <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
if (client.getSessionId().equals(excludedClient.getSessionId())) {
continue;
}
client.sendEvent(name, ackCallback.createClientCallback(client), data);
}
ackCallback.loopFinished();
}
<T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback);
<T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback);
}

117
fine-socketio/src/main/java/com/fr/third/socketio/MultiRoomBroadcastOperations.java

@ -0,0 +1,117 @@
/**
* Copyright (c) 2012-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.fr.third.socketio;
import com.fr.third.socketio.protocol.Packet;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
/**
* author: liangjiaqi
* date: 2020/8/8 6:02 PM
*/
public class MultiRoomBroadcastOperations implements BroadcastOperations {
private Collection<BroadcastOperations> broadcastOperations;
public MultiRoomBroadcastOperations(Collection<BroadcastOperations> broadcastOperations) {
this.broadcastOperations = broadcastOperations;
}
@Override
public Collection<SocketIOClient> getClients() {
Set<SocketIOClient> clients = new HashSet<SocketIOClient>();
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
return clients;
}
for( BroadcastOperations b : this.broadcastOperations ) {
clients.addAll( b.getClients() );
}
return clients;
}
@Override
public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) {
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
return;
}
for( BroadcastOperations b : this.broadcastOperations ) {
b.send( packet, ackCallback );
}
}
@Override
public void sendEvent(String name, SocketIOClient excludedClient, Object... data) {
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
return;
}
for( BroadcastOperations b : this.broadcastOperations ) {
b.sendEvent( name, excludedClient, data );
}
}
@Override
public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) {
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
return;
}
for( BroadcastOperations b : this.broadcastOperations ) {
b.sendEvent( name, data, ackCallback );
}
}
@Override
public <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback) {
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
return;
}
for( BroadcastOperations b : this.broadcastOperations ) {
b.sendEvent( name, data, excludedClient, ackCallback );
}
}
@Override
public void send(Packet packet) {
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
return;
}
for( BroadcastOperations b : this.broadcastOperations ) {
b.send( packet );
}
}
@Override
public void disconnect() {
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
return;
}
for( BroadcastOperations b : this.broadcastOperations ) {
b.disconnect();
}
}
@Override
public void sendEvent(String name, Object... data) {
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
return;
}
for( BroadcastOperations b : this.broadcastOperations ) {
b.sendEvent( name, data );
}
}
}

123
fine-socketio/src/main/java/com/fr/third/socketio/SingleRoomBroadcastOperations.java

@ -0,0 +1,123 @@
/**
* Copyright (c) 2012-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.fr.third.socketio;
import com.fr.third.socketio.misc.IterableCollection;
import com.fr.third.socketio.protocol.Packet;
import com.fr.third.socketio.protocol.PacketType;
import com.fr.third.socketio.store.StoreFactory;
import com.fr.third.socketio.store.pubsub.DispatchMessage;
import com.fr.third.socketio.store.pubsub.PubSubType;
import java.util.Arrays;
import java.util.Collection;
/**
* Author: liangjiaqi
* Date: 2020/8/8 6:08 PM
*/
public class SingleRoomBroadcastOperations implements BroadcastOperations {
private final String namespace;
private final String room;
private final Iterable<SocketIOClient> clients;
private final StoreFactory storeFactory;
public SingleRoomBroadcastOperations(String namespace, String room, Iterable<SocketIOClient> clients, StoreFactory storeFactory) {
super();
this.namespace = namespace;
this.room = room;
this.clients = clients;
this.storeFactory = storeFactory;
}
private void dispatch(Packet packet) {
this.storeFactory.pubSubStore().publish(
PubSubType.DISPATCH,
new DispatchMessage(this.room, packet, this.namespace));
}
@Override
public Collection<SocketIOClient> getClients() {
return new IterableCollection<SocketIOClient>(clients);
}
@Override
public void send(Packet packet) {
for (SocketIOClient client : clients) {
client.send(packet);
}
dispatch(packet);
}
@Override
public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
client.send(packet, ackCallback.createClientCallback(client));
}
ackCallback.loopFinished();
}
@Override
public void disconnect() {
for (SocketIOClient client : clients) {
client.disconnect();
}
}
@Override
public void sendEvent(String name, SocketIOClient excludedClient, Object... data) {
Packet packet = new Packet(PacketType.MESSAGE);
packet.setSubType(PacketType.EVENT);
packet.setName(name);
packet.setData(Arrays.asList(data));
for (SocketIOClient client : clients) {
if (client.getSessionId().equals(excludedClient.getSessionId())) {
continue;
}
client.send(packet);
}
dispatch(packet);
}
@Override
public void sendEvent(String name, Object... data) {
Packet packet = new Packet(PacketType.MESSAGE);
packet.setSubType(PacketType.EVENT);
packet.setName(name);
packet.setData(Arrays.asList(data));
send(packet);
}
@Override
public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
client.sendEvent(name, ackCallback.createClientCallback(client), data);
}
ackCallback.loopFinished();
}
@Override
public <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
if (client.getSessionId().equals(excludedClient.getSessionId())) {
continue;
}
client.sendEvent(name, ackCallback.createClientCallback(client), data);
}
ackCallback.loopFinished();
}
}

6
fine-socketio/src/main/java/com/fr/third/socketio/SocketIOChannelInitializer.java

@ -24,9 +24,6 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import com.fr.stable.ArrayUtils;
import com.fr.third.socketio.scheduler.HashedWheelTimeoutScheduler;
import com.fr.third.socketio.transport.PollingTransport;
import com.fr.third.socketio.transport.WebSocketTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,9 +40,12 @@ import com.fr.third.socketio.protocol.JsonSupport;
import com.fr.third.socketio.protocol.PacketDecoder;
import com.fr.third.socketio.protocol.PacketEncoder;
import com.fr.third.socketio.scheduler.CancelableScheduler;
import com.fr.third.socketio.scheduler.HashedWheelTimeoutScheduler;
import com.fr.third.socketio.store.StoreFactory;
import com.fr.third.socketio.store.pubsub.DisconnectMessage;
import com.fr.third.socketio.store.pubsub.PubSubType;
import com.fr.third.socketio.transport.PollingTransport;
import com.fr.third.socketio.transport.WebSocketTransport;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;

33
fine-socketio/src/main/java/com/fr/third/socketio/SocketIOServer.java

@ -15,12 +15,7 @@
*/
package com.fr.third.socketio;
import com.fr.third.socketio.listener.ClientListeners;
import com.fr.third.socketio.listener.ConnectListener;
import com.fr.third.socketio.listener.DataListener;
import com.fr.third.socketio.listener.DisconnectListener;
import com.fr.third.socketio.listener.MultiTypeEventListener;
import com.fr.third.socketio.listener.PingListener;
import com.fr.third.socketio.listener.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@ -34,7 +29,9 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -104,7 +101,16 @@ public class SocketIOServer implements ClientListeners {
}
public BroadcastOperations getBroadcastOperations() {
return new BroadcastOperations(getAllClients(), configCopy.getStoreFactory());
Collection<SocketIONamespace> namespaces = namespacesHub.getAllNamespaces();
List<BroadcastOperations> list = new ArrayList<BroadcastOperations>();
BroadcastOperations broadcast = null;
if( namespaces != null && namespaces.size() > 0 ) {
for( SocketIONamespace n : namespaces ) {
broadcast = n.getBroadcastOperations();
list.add( broadcast );
}
}
return new MultiRoomBroadcastOperations( list );
}
/**
@ -115,8 +121,16 @@ public class SocketIOServer implements ClientListeners {
* @return broadcast operations
*/
public BroadcastOperations getRoomOperations(String room) {
Iterable<SocketIOClient> clients = namespacesHub.getRoomClients(room);
return new BroadcastOperations(clients, configCopy.getStoreFactory());
Collection<SocketIONamespace> namespaces = namespacesHub.getAllNamespaces();
List<BroadcastOperations> list = new ArrayList<BroadcastOperations>();
BroadcastOperations broadcast = null;
if( namespaces != null && namespaces.size() > 0 ) {
for( SocketIONamespace n : namespaces ) {
broadcast = n.getRoomOperations( room );
list.add( broadcast );
}
}
return new MultiRoomBroadcastOperations( list );
}
/**
@ -291,4 +305,3 @@ public class SocketIOServer implements ClientListeners {
}

6
fine-socketio/src/main/java/com/fr/third/socketio/ack/AckManager.java

@ -15,16 +15,12 @@
*/
package com.fr.third.socketio.ack;
import com.fr.third.socketio.*;
import com.fr.third.socketio.handler.ClientHead;
import com.fr.third.socketio.protocol.Packet;
import com.fr.third.socketio.scheduler.CancelableScheduler;
import com.fr.third.socketio.scheduler.SchedulerKey;
import com.fr.third.socketio.scheduler.SchedulerKey.Type;
import com.fr.third.socketio.AckCallback;
import com.fr.third.socketio.Disconnectable;
import com.fr.third.socketio.MultiTypeAckCallback;
import com.fr.third.socketio.MultiTypeArgs;
import com.fr.third.socketio.SocketIOClient;
import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

12
fine-socketio/src/main/java/com/fr/third/socketio/handler/AuthorizeHandler.java

@ -26,6 +26,9 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fr.third.socketio.Configuration;
import com.fr.third.socketio.Disconnectable;
import com.fr.third.socketio.DisconnectableHub;
@ -33,12 +36,6 @@ import com.fr.third.socketio.HandshakeData;
import com.fr.third.socketio.SocketIOClient;
import com.fr.third.socketio.Transport;
import com.fr.third.socketio.ack.AckManager;
import com.fr.third.socketio.store.StoreFactory;
import com.fr.third.socketio.store.pubsub.ConnectMessage;
import com.fr.third.socketio.store.pubsub.PubSubType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fr.third.socketio.messages.HttpErrorMessage;
import com.fr.third.socketio.namespace.Namespace;
import com.fr.third.socketio.namespace.NamespacesHub;
@ -48,6 +45,9 @@ import com.fr.third.socketio.protocol.PacketType;
import com.fr.third.socketio.scheduler.CancelableScheduler;
import com.fr.third.socketio.scheduler.SchedulerKey;
import com.fr.third.socketio.scheduler.SchedulerKey.Type;
import com.fr.third.socketio.store.StoreFactory;
import com.fr.third.socketio.store.pubsub.ConnectMessage;
import com.fr.third.socketio.store.pubsub.PubSubType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;

3
fine-socketio/src/main/java/com/fr/third/socketio/handler/ClientsBox.java

@ -15,13 +15,14 @@
*/
package com.fr.third.socketio.handler;
import com.fr.third.socketio.HandshakeData;
import io.netty.channel.Channel;
import io.netty.util.internal.PlatformDependent;
import java.util.Map;
import java.util.UUID;
import com.fr.third.socketio.HandshakeData;
public class ClientsBox {
private final Map<UUID, ClientHead> uuid2clients = PlatformDependent.newConcurrentHashMap();

2
fine-socketio/src/main/java/com/fr/third/socketio/handler/InPacketHandler.java

@ -15,7 +15,6 @@
*/
package com.fr.third.socketio.handler;
import com.fr.third.socketio.transport.NamespaceClient;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@ -32,6 +31,7 @@ import com.fr.third.socketio.namespace.NamespacesHub;
import com.fr.third.socketio.protocol.Packet;
import com.fr.third.socketio.protocol.PacketDecoder;
import com.fr.third.socketio.protocol.PacketType;
import com.fr.third.socketio.transport.NamespaceClient;
@Sharable
public class InPacketHandler extends SimpleChannelInboundHandler<PacketsMessage> {

3
fine-socketio/src/main/java/com/fr/third/socketio/listener/DataListener.java

@ -26,8 +26,7 @@ public interface DataListener<T> {
* @param client - receiver
* @param data - received object
* @param ackSender - ack request
*
* @throws Exception
*
*/
void onData(SocketIOClient client, T data, AckRequest ackSender) throws Exception;

3
fine-socketio/src/main/java/com/fr/third/socketio/listener/DefaultExceptionListener.java

@ -15,7 +15,6 @@
*/
package com.fr.third.socketio.listener;
import com.fr.third.socketio.SocketIOClient;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
@ -23,6 +22,8 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fr.third.socketio.SocketIOClient;
public class DefaultExceptionListener extends ExceptionListenerAdapter {
private static final Logger log = LoggerFactory.getLogger(DefaultExceptionListener.class);

3
fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListener.java

@ -15,11 +15,12 @@
*/
package com.fr.third.socketio.listener;
import com.fr.third.socketio.SocketIOClient;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import com.fr.third.socketio.SocketIOClient;
public interface ExceptionListener {
void onEventException(Exception e, List<Object> args, SocketIOClient client);

3
fine-socketio/src/main/java/com/fr/third/socketio/listener/ExceptionListenerAdapter.java

@ -15,11 +15,12 @@
*/
package com.fr.third.socketio.listener;
import com.fr.third.socketio.SocketIOClient;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import com.fr.third.socketio.SocketIOClient;
/**
* Base callback exceptions listener
*

3
fine-socketio/src/main/java/com/fr/third/socketio/messages/PacketsMessage.java

@ -15,9 +15,10 @@
*/
package com.fr.third.socketio.messages;
import io.netty.buffer.ByteBuf;
import com.fr.third.socketio.Transport;
import com.fr.third.socketio.handler.ClientHead;
import io.netty.buffer.ByteBuf;
public class PacketsMessage {

34
fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterable.java

@ -19,14 +19,11 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class CompositeIterable<T> implements Iterable<T>, Iterator<T> {
public class CompositeIterable<T> implements Iterable<T> {
private List<Iterable<T>> iterablesList;
private Iterable<T>[] iterables;
private Iterator<Iterator<T>> listIterator;
private Iterator<T> currentIterator;
public CompositeIterable(List<Iterable<T>> iterables) {
this.iterablesList = iterables;
}
@ -52,35 +49,8 @@ public class CompositeIterable<T> implements Iterable<T>, Iterator<T> {
iterators.add(iterable.iterator());
}
}
listIterator = iterators.iterator();
currentIterator = null;
return this;
}
@Override
public boolean hasNext() {
if (currentIterator == null || !currentIterator.hasNext()) {
while (listIterator.hasNext()) {
Iterator<T> iterator = listIterator.next();
if (iterator.hasNext()) {
currentIterator = iterator;
return true;
}
}
return false;
}
return currentIterator.hasNext();
return new CompositeIterator<T>(iterators.iterator());
}
@Override
public T next() {
hasNext();
return currentIterator.next();
}
@Override
public void remove() {
currentIterator.remove();
}
}

55
fine-socketio/src/main/java/com/fr/third/socketio/misc/CompositeIterator.java

@ -0,0 +1,55 @@
/**
* Copyright (c) 2012-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.fr.third.socketio.misc;
import java.util.Iterator;
public class CompositeIterator<T> implements Iterator<T> {
private Iterator<Iterator<T>> listIterator;
private Iterator<T> currentIterator;
public CompositeIterator(Iterator<Iterator<T>> listIterator) {
this.currentIterator = null;
this.listIterator = listIterator;
}
@Override
public boolean hasNext() {
if (currentIterator == null || !currentIterator.hasNext()) {
while (listIterator.hasNext()) {
Iterator<T> iterator = listIterator.next();
if (iterator.hasNext()) {
currentIterator = iterator;
return true;
}
}
return false;
}
return currentIterator.hasNext();
}
@Override
public T next() {
hasNext();
return currentIterator.next();
}
@Override
public void remove() {
currentIterator.remove();
}
}

10
fine-socketio/src/main/java/com/fr/third/socketio/namespace/Namespace.java

@ -31,6 +31,7 @@ import com.fr.third.socketio.AckRequest;
import com.fr.third.socketio.BroadcastOperations;
import com.fr.third.socketio.Configuration;
import com.fr.third.socketio.MultiTypeArgs;
import com.fr.third.socketio.SingleRoomBroadcastOperations;
import com.fr.third.socketio.SocketIOClient;
import com.fr.third.socketio.SocketIONamespace;
import com.fr.third.socketio.annotation.ScannerEngine;
@ -206,11 +207,10 @@ public class Namespace implements SocketIONamespace {
Set<String> joinedRooms = client.getAllRooms();
allClients.remove(client.getSessionId());
leave(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
// client must leave all rooms and publish the leave msg one by one on disconnect.
for (String joinedRoom : joinedRooms) {
leave(roomClients, joinedRoom, client.getSessionId());
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), joinedRoom, getName()));
}
clientRooms.remove(client.getSessionId());
@ -285,12 +285,12 @@ public class Namespace implements SocketIONamespace {
@Override
public BroadcastOperations getBroadcastOperations() {
return new BroadcastOperations(allClients.values(), storeFactory);
return new SingleRoomBroadcastOperations(getName(), getName(), allClients.values(), storeFactory);
}
@Override
public BroadcastOperations getRoomOperations(String room) {
return new BroadcastOperations(getRoomClients(room), storeFactory);
return new SingleRoomBroadcastOperations(getName(), room, getRoomClients(room), storeFactory);
}
@Override

9
fine-socketio/src/main/java/com/fr/third/socketio/namespace/NamespacesHub.java

@ -15,10 +15,6 @@
*/
package com.fr.third.socketio.namespace;
import com.fr.third.socketio.Configuration;
import com.fr.third.socketio.SocketIOClient;
import com.fr.third.socketio.SocketIONamespace;
import com.fr.third.socketio.misc.CompositeIterable;
import io.netty.util.internal.PlatformDependent;
import java.util.ArrayList;
@ -26,6 +22,11 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import com.fr.third.socketio.Configuration;
import com.fr.third.socketio.SocketIOClient;
import com.fr.third.socketio.SocketIONamespace;
import com.fr.third.socketio.misc.CompositeIterable;
public class NamespacesHub {
private final ConcurrentMap<String, SocketIONamespace> namespaces = PlatformDependent.newConcurrentHashMap();

2
fine-socketio/src/main/java/com/fr/third/socketio/protocol/Event.java

@ -17,7 +17,7 @@ package com.fr.third.socketio.protocol;
import java.util.List;
class Event {
public class Event {
private String name;
private List<Object> args;

4
fine-socketio/src/main/java/com/fr/third/socketio/protocol/JacksonJsonSupport.java

@ -28,11 +28,11 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import com.fr.third.socketio.AckCallback;
import com.fr.third.socketio.MultiTypeAckCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fr.third.socketio.AckCallback;
import com.fr.third.socketio.MultiTypeAckCallback;
import com.fr.third.socketio.namespace.Namespace;
import com.fr.third.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fr.third.fasterxml.jackson.core.JsonGenerationException;

3
fine-socketio/src/main/java/com/fr/third/socketio/protocol/JsonSupport.java

@ -15,13 +15,14 @@
*/
package com.fr.third.socketio.protocol;
import com.fr.third.socketio.AckCallback;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import java.io.IOException;
import java.util.List;
import com.fr.third.socketio.AckCallback;
/**
* JSON infrastructure interface.
* Allows to implement custom realizations

25
fine-socketio/src/main/java/com/fr/third/socketio/protocol/Packet.java

@ -77,6 +77,31 @@ public class Packet implements Serializable {
return (T)data;
}
/**
* Creates a copy of #{@link Packet} with new namespace set
* if it differs from current namespace.
* Otherwise, returns original object unchanged
*
* @param namespace
* @return packet
*/
public Packet withNsp(String namespace) {
if (this.nsp.equalsIgnoreCase(namespace)) {
return this;
} else {
Packet newPacket = new Packet(this.type);
newPacket.setAckId(this.ackId);
newPacket.setData(this.data);
newPacket.setDataSource(this.dataSource);
newPacket.setName(this.name);
newPacket.setSubType(this.subType);
newPacket.setNsp(namespace);
newPacket.attachments = this.attachments;
newPacket.attachmentsCount = this.attachmentsCount;
return newPacket;
}
}
public void setNsp(String endpoint) {
this.nsp = endpoint;
}

34
fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketDecoder.java

@ -20,6 +20,7 @@ import com.fr.third.socketio.ack.AckManager;
import com.fr.third.socketio.handler.ClientHead;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.base64.Base64;
import io.netty.util.CharsetUtil;
@ -91,16 +92,6 @@ public class PacketDecoder {
return PacketType.valueOfInner(typeId);
}
@Deprecated
public Packet decodePacket(String string, UUID uuid) throws IOException {
ByteBuf buf = Unpooled.copiedBuffer(string, CharsetUtil.UTF_8);
try {
return null;
} finally {
buf.release();
}
}
private boolean hasLengthHeader(ByteBuf buffer) {
for (int i = 0; i < Math.min(buffer.readableBytes(), 10); i++) {
byte b = buffer.getByte(buffer.readerIndex() + i);
@ -309,22 +300,27 @@ public class PacketDecoder {
}
}
}
private String readNamespace(ByteBuf frame){
private String readNamespace(ByteBuf frame) {
/**
* namespace post request with url queryString, like
* /message?a=1,
* /message,
*/
int endIndex = frame.bytesBefore((byte)'?');
if(endIndex > 0){
return readString(frame,endIndex);
ByteBuf buffer = frame.slice();
// skip this frame
frame.readerIndex(frame.readerIndex() + frame.readableBytes());
int endIndex = buffer.bytesBefore((byte) '?');
if (endIndex > 0) {
return readString(buffer, endIndex);
}
endIndex = frame.bytesBefore((byte)',');
if(endIndex > 0){
return readString(frame,endIndex);
endIndex = buffer.bytesBefore((byte) ',');
if (endIndex > 0) {
return readString(buffer, endIndex);
}
return readString(frame);
return readString(buffer);
}
}

3
fine-socketio/src/main/java/com/fr/third/socketio/protocol/PacketEncoder.java

@ -15,7 +15,6 @@
*/
package com.fr.third.socketio.protocol;
import com.fr.third.socketio.Configuration;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
@ -29,6 +28,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import com.fr.third.socketio.Configuration;
public class PacketEncoder {
private static final byte[] BINARY_HEADER = "b4".getBytes(CharsetUtil.UTF_8);

24
fine-socketio/src/main/java/com/fr/third/socketio/protocol/UTF8CharsScanner.java

@ -86,15 +86,6 @@ public class UTF8CharsScanner {
return i;
}
public int getLength(ByteBuf inputBuffer, int start) {
int len = 0;
for (int i = start; i < inputBuffer.writerIndex();) {
i = getCharTailIndex(inputBuffer, i);
len++;
}
return len;
}
public int getActualLength(ByteBuf inputBuffer, int length) {
int len = 0;
int start = inputBuffer.readerIndex();
@ -108,19 +99,4 @@ public class UTF8CharsScanner {
throw new IllegalStateException();
}
public int findTailIndex(ByteBuf inputBuffer, int start, int end,
int charsToRead) {
int len = 0;
int i = start;
while (i < end) {
i = getCharTailIndex(inputBuffer, i);
len++;
if (charsToRead == len) {
break;
}
}
return i;
}
}

2
fine-socketio/src/main/java/com/fr/third/socketio/store/pubsub/BaseStoreFactory.java

@ -15,7 +15,6 @@
*/
package com.fr.third.socketio.store.pubsub;
import com.fr.third.socketio.store.StoreFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -23,6 +22,7 @@ import com.fr.third.socketio.handler.AuthorizeHandler;
import com.fr.third.socketio.handler.ClientHead;
import com.fr.third.socketio.namespace.NamespacesHub;
import com.fr.third.socketio.protocol.JsonSupport;
import com.fr.third.socketio.store.StoreFactory;
public abstract class BaseStoreFactory implements StoreFactory {

25
fine-socketio/src/main/java/com/fr/third/socketio/transport/BaseTransport.java

@ -1,25 +0,0 @@
/**
* Copyright (c) 2012-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.fr.third.socketio.transport;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.fr.third.socketio.Disconnectable;
@Deprecated
public abstract class BaseTransport extends ChannelInboundHandlerAdapter implements Disconnectable {
}

4
fine-socketio/src/main/java/com/fr/third/socketio/transport/NamespaceClient.java

@ -104,8 +104,8 @@ public class NamespaceClient implements SocketIOClient {
if (!isConnected()) {
return;
}
packet.setNsp(namespace.getName());
baseClient.send(packet);
baseClient.send(packet.withNsp(namespace.getName()));
}
public void onDisconnect() {

1
fine-socketio/src/main/java/com/fr/third/socketio/transport/WebSocketTransport.java

@ -51,6 +51,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.ReferenceCountUtil;
@Sharable
public class WebSocketTransport extends ChannelInboundHandlerAdapter {

Loading…
Cancel
Save