|
|
@ -86,13 +86,28 @@ public class ClientHead { |
|
|
|
channels.put(Transport.WEBSOCKET, new TransportState()); |
|
|
|
channels.put(Transport.WEBSOCKET, new TransportState()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void bindChannel(Channel channel, Transport transport) { |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 这里多线程下,会有问题, |
|
|
|
|
|
|
|
* 因为 channel 不会变, 但是 state 可能因为多线程下调用 bindChannel |
|
|
|
|
|
|
|
* 从而导致 state.update , 致使真正 add() 时, |
|
|
|
|
|
|
|
* channel 和 state 得数据不一致。 |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* 解决方法 |
|
|
|
|
|
|
|
* 1. 加锁 |
|
|
|
|
|
|
|
* 2. 为 clientbox 加一个子线程, 定时清除 disconnected 为 true 的值。 |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* @param channel 频道 |
|
|
|
|
|
|
|
* @param transport 协议 |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public synchronized void bindChannel(Channel channel, Transport transport) { |
|
|
|
log.debug("binding channel: {} to transport: {}", channel, transport); |
|
|
|
log.debug("binding channel: {} to transport: {}", channel, transport); |
|
|
|
|
|
|
|
|
|
|
|
TransportState state = channels.get(transport); |
|
|
|
TransportState state = channels.get(transport); |
|
|
|
Channel prevChannel = state.update(channel); |
|
|
|
Channel prevChannel = state.update(channel); |
|
|
|
if (prevChannel != null) { |
|
|
|
if (prevChannel != null) { |
|
|
|
clientsBox.remove(prevChannel); |
|
|
|
clientsBox.remove(prevChannel); |
|
|
|
|
|
|
|
prevChannel.disconnect(); |
|
|
|
} |
|
|
|
} |
|
|
|
clientsBox.add(channel, this); |
|
|
|
clientsBox.add(channel, this); |
|
|
|
|
|
|
|
|
|
|
@ -122,16 +137,7 @@ public class ClientHead { |
|
|
|
|
|
|
|
|
|
|
|
public void schedulePingTimeout() { |
|
|
|
public void schedulePingTimeout() { |
|
|
|
SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, sessionId); |
|
|
|
SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, sessionId); |
|
|
|
disconnectScheduler.schedule(key, new Runnable() { |
|
|
|
disconnectScheduler.schedule(key, new TimeOutTask(this), configuration.getPingTimeout() + configuration.getPingInterval(), TimeUnit.MILLISECONDS); |
|
|
|
@Override |
|
|
|
|
|
|
|
public void run() { |
|
|
|
|
|
|
|
ClientHead client = clientsBox.get(sessionId); |
|
|
|
|
|
|
|
if (client != null) { |
|
|
|
|
|
|
|
client.disconnect(); |
|
|
|
|
|
|
|
log.debug("{} removed due to ping timeout", sessionId); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}, configuration.getPingTimeout() + configuration.getPingInterval(), TimeUnit.MILLISECONDS); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public ChannelFuture send(Packet packet, Transport transport) { |
|
|
|
public ChannelFuture send(Packet packet, Transport transport) { |
|
|
@ -152,6 +158,8 @@ public class ClientHead { |
|
|
|
|
|
|
|
|
|
|
|
public void removeNamespaceClient(NamespaceClient client) { |
|
|
|
public void removeNamespaceClient(NamespaceClient client) { |
|
|
|
namespaceClients.remove(client.getNamespace()); |
|
|
|
namespaceClients.remove(client.getNamespace()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 清空到最后一个的时候, 断开连接。 调用 initializer.disconnect
|
|
|
|
if (namespaceClients.isEmpty()) { |
|
|
|
if (namespaceClients.isEmpty()) { |
|
|
|
disconnectableHub.onDisconnect(this); |
|
|
|
disconnectableHub.onDisconnect(this); |
|
|
|
} |
|
|
|
} |
|
|
@ -268,4 +276,23 @@ public class ClientHead { |
|
|
|
return lastBinaryPacket; |
|
|
|
return lastBinaryPacket; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private class TimeOutTask implements Runnable{ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ClientHead client; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public TimeOutTask(ClientHead client) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
assert client != null; |
|
|
|
|
|
|
|
this.client = client; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public void run() { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (client != null && !client.disconnected.get()) { |
|
|
|
|
|
|
|
client.disconnect(); |
|
|
|
|
|
|
|
log.debug("{} removed due to ping timeout", client.getSessionId()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|