Browse Source

socketio加入redis集群支持

10.0
richie 7 years ago
parent
commit
90926a6e5a
  1. 180
      fine-socketio/src/com/fr/third/socketio/store/RedissonPubSubStore.java
  2. 102
      fine-socketio/src/com/fr/third/socketio/store/RedissonStore.java
  3. 152
      fine-socketio/src/com/fr/third/socketio/store/RedissonStoreFactory.java

180
fine-socketio/src/com/fr/third/socketio/store/RedissonPubSubStore.java

@ -1,90 +1,90 @@
///**
// * Copyright 2012 Nikita Koksharov
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//package com.fr.third.socketio.store;
//
//import java.util.Queue;
//import java.util.concurrent.ConcurrentLinkedQueue;
//import java.util.concurrent.ConcurrentMap;
//
//import com.fr.third.socketio.store.pubsub.PubSubListener;
//import com.fr.third.socketio.store.pubsub.PubSubMessage;
//import com.fr.third.socketio.store.pubsub.PubSubStore;
//import com.fr.third.socketio.store.pubsub.PubSubType;
//import org.redisson.api.RTopic;
//import org.redisson.api.RedissonClient;
//import org.redisson.api.listener.MessageListener;
//
//import io.netty.util.internal.PlatformDependent;
//
//public class RedissonPubSubStore implements PubSubStore {
//
// private final RedissonClient redissonPub;
// private final RedissonClient redissonSub;
// private final Long nodeId;
//
// private final ConcurrentMap<String, Queue<Integer>> map = PlatformDependent.newConcurrentHashMap();
//
// public RedissonPubSubStore(RedissonClient redissonPub, RedissonClient redissonSub, Long nodeId) {
// this.redissonPub = redissonPub;
// this.redissonSub = redissonSub;
// this.nodeId = nodeId;
// }
//
// @Override
// public void publish(PubSubType type, PubSubMessage msg) {
// msg.setNodeId(nodeId);
// redissonPub.getTopic(type.toString()).publish(msg);
// }
//
// @Override
// public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
// String name = type.toString();
// RTopic<T> topic = redissonSub.getTopic(name);
// int regId = topic.addListener(new MessageListener<T>() {
// @Override
// public void onMessage(String channel, T msg) {
// if (!nodeId.equals(msg.getNodeId())) {
// listener.onMessage(msg);
// }
// }
// });
//
// Queue<Integer> list = map.get(name);
// if (list == null) {
// list = new ConcurrentLinkedQueue<Integer>();
// Queue<Integer> oldList = map.putIfAbsent(name, list);
// if (oldList != null) {
// list = oldList;
// }
// }
// list.add(regId);
// }
//
// @Override
// public void unsubscribe(PubSubType type) {
// String name = type.toString();
// Queue<Integer> regIds = map.remove(name);
// RTopic<Object> topic = redissonSub.getTopic(name);
// for (Integer id : regIds) {
// topic.removeListener(id);
// }
// }
//
// @Override
// public void shutdown() {
// }
//
//}
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.fr.third.socketio.store;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import com.fr.third.socketio.store.pubsub.PubSubListener;
import com.fr.third.socketio.store.pubsub.PubSubMessage;
import com.fr.third.socketio.store.pubsub.PubSubStore;
import com.fr.third.socketio.store.pubsub.PubSubType;
import com.fr.third.redisson.api.RTopic;
import com.fr.third.redisson.api.RedissonClient;
import com.fr.third.redisson.api.listener.MessageListener;
import io.netty.util.internal.PlatformDependent;
public class RedissonPubSubStore implements PubSubStore {
private final RedissonClient redissonPub;
private final RedissonClient redissonSub;
private final Long nodeId;
private final ConcurrentMap<String, Queue<Integer>> map = PlatformDependent.newConcurrentHashMap();
public RedissonPubSubStore(RedissonClient redissonPub, RedissonClient redissonSub, Long nodeId) {
this.redissonPub = redissonPub;
this.redissonSub = redissonSub;
this.nodeId = nodeId;
}
@Override
public void publish(PubSubType type, PubSubMessage msg) {
msg.setNodeId(nodeId);
redissonPub.getTopic(type.toString()).publish(msg);
}
@Override
public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
RTopic<T> topic = redissonSub.getTopic(name);
int regId = topic.addListener(new MessageListener<T>() {
@Override
public void onMessage(String channel, T msg) {
if (!nodeId.equals(msg.getNodeId())) {
listener.onMessage(msg);
}
}
});
Queue<Integer> list = map.get(name);
if (list == null) {
list = new ConcurrentLinkedQueue<Integer>();
Queue<Integer> oldList = map.putIfAbsent(name, list);
if (oldList != null) {
list = oldList;
}
}
list.add(regId);
}
@Override
public void unsubscribe(PubSubType type) {
String name = type.toString();
Queue<Integer> regIds = map.remove(name);
RTopic<Object> topic = redissonSub.getTopic(name);
for (Integer id : regIds) {
topic.removeListener(id);
}
}
@Override
public void shutdown() {
}
}

102
fine-socketio/src/com/fr/third/socketio/store/RedissonStore.java

@ -1,51 +1,51 @@
///**
// * Copyright 2012 Nikita Koksharov
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//package com.fr.third.socketio.store;
//
//import java.util.Map;
//import java.util.UUID;
//
//import org.redisson.api.RedissonClient;
//
//public class RedissonStore implements Store {
//
// private final Map<String, Object> map;
//
// public RedissonStore(UUID sessionId, RedissonClient redisson) {
// this.map = redisson.getMap(sessionId.toString());
// }
//
// @Override
// public void set(String key, Object value) {
// map.put(key, value);
// }
//
// @Override
// public <T> T get(String key) {
// return (T) map.get(key);
// }
//
// @Override
// public boolean has(String key) {
// return map.containsKey(key);
// }
//
// @Override
// public void del(String key) {
// map.remove(key);
// }
//
//}
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.fr.third.socketio.store;
import java.util.Map;
import java.util.UUID;
import com.fr.third.redisson.api.RedissonClient;
public class RedissonStore implements Store {
private final Map<String, Object> map;
public RedissonStore(UUID sessionId, RedissonClient redisson) {
this.map = redisson.getMap(sessionId.toString());
}
@Override
public void set(String key, Object value) {
map.put(key, value);
}
@Override
public <T> T get(String key) {
return (T) map.get(key);
}
@Override
public boolean has(String key) {
return map.containsKey(key);
}
@Override
public void del(String key) {
map.remove(key);
}
}

152
fine-socketio/src/com/fr/third/socketio/store/RedissonStoreFactory.java

@ -1,76 +1,76 @@
///**
// * Copyright 2012 Nikita Koksharov
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//package com.fr.third.socketio.store;
//
//import java.util.Map;
//import java.util.UUID;
//
//import com.fr.third.socketio.store.pubsub.BaseStoreFactory;
//import com.fr.third.socketio.store.pubsub.PubSubStore;
//import org.redisson.Redisson;
//import org.redisson.api.RedissonClient;
//
//public class RedissonStoreFactory extends BaseStoreFactory {
//
// private final RedissonClient redisClient;
// private final RedissonClient redisPub;
// private final RedissonClient redisSub;
//
// private final PubSubStore pubSubStore;
//
// public RedissonStoreFactory() {
// this(Redisson.create());
// }
//
// public RedissonStoreFactory(RedissonClient redisson) {
// this.redisClient = redisson;
// this.redisPub = redisson;
// this.redisSub = redisson;
//
// this.pubSubStore = new RedissonPubSubStore(redisPub, redisSub, getNodeId());
// }
//
// public RedissonStoreFactory(Redisson redisClient, Redisson redisPub, Redisson redisSub) {
// this.redisClient = redisClient;
// this.redisPub = redisPub;
// this.redisSub = redisSub;
//
// this.pubSubStore = new RedissonPubSubStore(redisPub, redisSub, getNodeId());
// }
//
// @Override
// public Store createStore(UUID sessionId) {
// return new RedissonStore(sessionId, redisClient);
// }
//
// @Override
// public PubSubStore pubSubStore() {
// return pubSubStore;
// }
//
// @Override
// public void shutdown() {
// redisClient.shutdown();
// redisPub.shutdown();
// redisSub.shutdown();
// }
//
// @Override
// public <K, V> Map<K, V> createMap(String name) {
// return redisClient.getMap(name);
// }
//
//}
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.fr.third.socketio.store;
import java.util.Map;
import java.util.UUID;
import com.fr.third.socketio.store.pubsub.BaseStoreFactory;
import com.fr.third.socketio.store.pubsub.PubSubStore;
import com.fr.third.redisson.Redisson;
import com.fr.third.redisson.api.RedissonClient;
public class RedissonStoreFactory extends BaseStoreFactory {
private final RedissonClient redisClient;
private final RedissonClient redisPub;
private final RedissonClient redisSub;
private final PubSubStore pubSubStore;
public RedissonStoreFactory() {
this(Redisson.create());
}
public RedissonStoreFactory(RedissonClient redisson) {
this.redisClient = redisson;
this.redisPub = redisson;
this.redisSub = redisson;
this.pubSubStore = new RedissonPubSubStore(redisPub, redisSub, getNodeId());
}
public RedissonStoreFactory(Redisson redisClient, Redisson redisPub, Redisson redisSub) {
this.redisClient = redisClient;
this.redisPub = redisPub;
this.redisSub = redisSub;
this.pubSubStore = new RedissonPubSubStore(redisPub, redisSub, getNodeId());
}
@Override
public Store createStore(UUID sessionId) {
return new RedissonStore(sessionId, redisClient);
}
@Override
public PubSubStore pubSubStore() {
return pubSubStore;
}
@Override
public void shutdown() {
redisClient.shutdown();
redisPub.shutdown();
redisSub.shutdown();
}
@Override
public <K, V> Map<K, V> createMap(String name) {
return redisClient.getMap(name);
}
}

Loading…
Cancel
Save