From 90926a6e5aa47a147b037d28a6f0b0a7a6917034 Mon Sep 17 00:00:00 2001 From: richie Date: Fri, 30 Mar 2018 11:32:58 +0800 Subject: [PATCH] =?UTF-8?q?socketio=E5=8A=A0=E5=85=A5redis=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socketio/store/RedissonPubSubStore.java | 180 +++++++++--------- .../third/socketio/store/RedissonStore.java | 102 +++++----- .../socketio/store/RedissonStoreFactory.java | 152 +++++++-------- 3 files changed, 217 insertions(+), 217 deletions(-) diff --git a/fine-socketio/src/com/fr/third/socketio/store/RedissonPubSubStore.java b/fine-socketio/src/com/fr/third/socketio/store/RedissonPubSubStore.java index 88ac66fcc..9535d4706 100644 --- a/fine-socketio/src/com/fr/third/socketio/store/RedissonPubSubStore.java +++ b/fine-socketio/src/com/fr/third/socketio/store/RedissonPubSubStore.java @@ -1,90 +1,90 @@ -///** -// * Copyright 2012 Nikita Koksharov -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package com.fr.third.socketio.store; -// -//import java.util.Queue; -//import java.util.concurrent.ConcurrentLinkedQueue; -//import java.util.concurrent.ConcurrentMap; -// -//import com.fr.third.socketio.store.pubsub.PubSubListener; -//import com.fr.third.socketio.store.pubsub.PubSubMessage; -//import com.fr.third.socketio.store.pubsub.PubSubStore; -//import com.fr.third.socketio.store.pubsub.PubSubType; -//import org.redisson.api.RTopic; -//import org.redisson.api.RedissonClient; -//import org.redisson.api.listener.MessageListener; -// -//import io.netty.util.internal.PlatformDependent; -// -//public class RedissonPubSubStore implements PubSubStore { -// -// private final RedissonClient redissonPub; -// private final RedissonClient redissonSub; -// private final Long nodeId; -// -// private final ConcurrentMap> map = PlatformDependent.newConcurrentHashMap(); -// -// public RedissonPubSubStore(RedissonClient redissonPub, RedissonClient redissonSub, Long nodeId) { -// this.redissonPub = redissonPub; -// this.redissonSub = redissonSub; -// this.nodeId = nodeId; -// } -// -// @Override -// public void publish(PubSubType type, PubSubMessage msg) { -// msg.setNodeId(nodeId); -// redissonPub.getTopic(type.toString()).publish(msg); -// } -// -// @Override -// public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { -// String name = type.toString(); -// RTopic topic = redissonSub.getTopic(name); -// int regId = topic.addListener(new MessageListener() { -// @Override -// public void onMessage(String channel, T msg) { -// if (!nodeId.equals(msg.getNodeId())) { -// listener.onMessage(msg); -// } -// } -// }); -// -// Queue list = map.get(name); -// if (list == null) { -// list = new ConcurrentLinkedQueue(); -// Queue oldList = map.putIfAbsent(name, list); -// if (oldList != null) { -// list = oldList; -// } -// } -// list.add(regId); -// } -// -// @Override -// public void unsubscribe(PubSubType type) { -// String name = type.toString(); -// Queue regIds = map.remove(name); -// RTopic topic = redissonSub.getTopic(name); -// for (Integer id : regIds) { -// topic.removeListener(id); -// } -// } -// -// @Override -// public void shutdown() { -// } -// -//} +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.fr.third.socketio.store; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; + +import com.fr.third.socketio.store.pubsub.PubSubListener; +import com.fr.third.socketio.store.pubsub.PubSubMessage; +import com.fr.third.socketio.store.pubsub.PubSubStore; +import com.fr.third.socketio.store.pubsub.PubSubType; +import com.fr.third.redisson.api.RTopic; +import com.fr.third.redisson.api.RedissonClient; +import com.fr.third.redisson.api.listener.MessageListener; + +import io.netty.util.internal.PlatformDependent; + +public class RedissonPubSubStore implements PubSubStore { + + private final RedissonClient redissonPub; + private final RedissonClient redissonSub; + private final Long nodeId; + + private final ConcurrentMap> map = PlatformDependent.newConcurrentHashMap(); + + public RedissonPubSubStore(RedissonClient redissonPub, RedissonClient redissonSub, Long nodeId) { + this.redissonPub = redissonPub; + this.redissonSub = redissonSub; + this.nodeId = nodeId; + } + + @Override + public void publish(PubSubType type, PubSubMessage msg) { + msg.setNodeId(nodeId); + redissonPub.getTopic(type.toString()).publish(msg); + } + + @Override + public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { + String name = type.toString(); + RTopic topic = redissonSub.getTopic(name); + int regId = topic.addListener(new MessageListener() { + @Override + public void onMessage(String channel, T msg) { + if (!nodeId.equals(msg.getNodeId())) { + listener.onMessage(msg); + } + } + }); + + Queue list = map.get(name); + if (list == null) { + list = new ConcurrentLinkedQueue(); + Queue oldList = map.putIfAbsent(name, list); + if (oldList != null) { + list = oldList; + } + } + list.add(regId); + } + + @Override + public void unsubscribe(PubSubType type) { + String name = type.toString(); + Queue regIds = map.remove(name); + RTopic topic = redissonSub.getTopic(name); + for (Integer id : regIds) { + topic.removeListener(id); + } + } + + @Override + public void shutdown() { + } + +} diff --git a/fine-socketio/src/com/fr/third/socketio/store/RedissonStore.java b/fine-socketio/src/com/fr/third/socketio/store/RedissonStore.java index 7e92a34a0..d3d3ae0bb 100644 --- a/fine-socketio/src/com/fr/third/socketio/store/RedissonStore.java +++ b/fine-socketio/src/com/fr/third/socketio/store/RedissonStore.java @@ -1,51 +1,51 @@ -///** -// * Copyright 2012 Nikita Koksharov -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package com.fr.third.socketio.store; -// -//import java.util.Map; -//import java.util.UUID; -// -//import org.redisson.api.RedissonClient; -// -//public class RedissonStore implements Store { -// -// private final Map map; -// -// public RedissonStore(UUID sessionId, RedissonClient redisson) { -// this.map = redisson.getMap(sessionId.toString()); -// } -// -// @Override -// public void set(String key, Object value) { -// map.put(key, value); -// } -// -// @Override -// public T get(String key) { -// return (T) map.get(key); -// } -// -// @Override -// public boolean has(String key) { -// return map.containsKey(key); -// } -// -// @Override -// public void del(String key) { -// map.remove(key); -// } -// -//} +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.fr.third.socketio.store; + +import java.util.Map; +import java.util.UUID; + +import com.fr.third.redisson.api.RedissonClient; + +public class RedissonStore implements Store { + + private final Map map; + + public RedissonStore(UUID sessionId, RedissonClient redisson) { + this.map = redisson.getMap(sessionId.toString()); + } + + @Override + public void set(String key, Object value) { + map.put(key, value); + } + + @Override + public T get(String key) { + return (T) map.get(key); + } + + @Override + public boolean has(String key) { + return map.containsKey(key); + } + + @Override + public void del(String key) { + map.remove(key); + } + +} diff --git a/fine-socketio/src/com/fr/third/socketio/store/RedissonStoreFactory.java b/fine-socketio/src/com/fr/third/socketio/store/RedissonStoreFactory.java index d1f824690..2486c0fc5 100644 --- a/fine-socketio/src/com/fr/third/socketio/store/RedissonStoreFactory.java +++ b/fine-socketio/src/com/fr/third/socketio/store/RedissonStoreFactory.java @@ -1,76 +1,76 @@ -///** -// * Copyright 2012 Nikita Koksharov -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package com.fr.third.socketio.store; -// -//import java.util.Map; -//import java.util.UUID; -// -//import com.fr.third.socketio.store.pubsub.BaseStoreFactory; -//import com.fr.third.socketio.store.pubsub.PubSubStore; -//import org.redisson.Redisson; -//import org.redisson.api.RedissonClient; -// -//public class RedissonStoreFactory extends BaseStoreFactory { -// -// private final RedissonClient redisClient; -// private final RedissonClient redisPub; -// private final RedissonClient redisSub; -// -// private final PubSubStore pubSubStore; -// -// public RedissonStoreFactory() { -// this(Redisson.create()); -// } -// -// public RedissonStoreFactory(RedissonClient redisson) { -// this.redisClient = redisson; -// this.redisPub = redisson; -// this.redisSub = redisson; -// -// this.pubSubStore = new RedissonPubSubStore(redisPub, redisSub, getNodeId()); -// } -// -// public RedissonStoreFactory(Redisson redisClient, Redisson redisPub, Redisson redisSub) { -// this.redisClient = redisClient; -// this.redisPub = redisPub; -// this.redisSub = redisSub; -// -// this.pubSubStore = new RedissonPubSubStore(redisPub, redisSub, getNodeId()); -// } -// -// @Override -// public Store createStore(UUID sessionId) { -// return new RedissonStore(sessionId, redisClient); -// } -// -// @Override -// public PubSubStore pubSubStore() { -// return pubSubStore; -// } -// -// @Override -// public void shutdown() { -// redisClient.shutdown(); -// redisPub.shutdown(); -// redisSub.shutdown(); -// } -// -// @Override -// public Map createMap(String name) { -// return redisClient.getMap(name); -// } -// -//} +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.fr.third.socketio.store; + +import java.util.Map; +import java.util.UUID; + +import com.fr.third.socketio.store.pubsub.BaseStoreFactory; +import com.fr.third.socketio.store.pubsub.PubSubStore; +import com.fr.third.redisson.Redisson; +import com.fr.third.redisson.api.RedissonClient; + +public class RedissonStoreFactory extends BaseStoreFactory { + + private final RedissonClient redisClient; + private final RedissonClient redisPub; + private final RedissonClient redisSub; + + private final PubSubStore pubSubStore; + + public RedissonStoreFactory() { + this(Redisson.create()); + } + + public RedissonStoreFactory(RedissonClient redisson) { + this.redisClient = redisson; + this.redisPub = redisson; + this.redisSub = redisson; + + this.pubSubStore = new RedissonPubSubStore(redisPub, redisSub, getNodeId()); + } + + public RedissonStoreFactory(Redisson redisClient, Redisson redisPub, Redisson redisSub) { + this.redisClient = redisClient; + this.redisPub = redisPub; + this.redisSub = redisSub; + + this.pubSubStore = new RedissonPubSubStore(redisPub, redisSub, getNodeId()); + } + + @Override + public Store createStore(UUID sessionId) { + return new RedissonStore(sessionId, redisClient); + } + + @Override + public PubSubStore pubSubStore() { + return pubSubStore; + } + + @Override + public void shutdown() { + redisClient.shutdown(); + redisPub.shutdown(); + redisSub.shutdown(); + } + + @Override + public Map createMap(String name) { + return redisClient.getMap(name); + } + +}