帆软使用的第三方框架。
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1231 lines
53 KiB

/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.fr.third.org.redisson;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.fr.third.org.redisson.api.LocalCachedMapOptions;
import com.fr.third.org.redisson.api.RFuture;
import com.fr.third.org.redisson.api.RLocalCachedMap;
import com.fr.third.org.redisson.api.RScoredSortedSet;
import com.fr.third.org.redisson.api.RTopic;
import com.fr.third.org.redisson.api.RedissonClient;
import com.fr.third.org.redisson.api.listener.BaseStatusListener;
import com.fr.third.org.redisson.api.listener.MessageListener;
import com.fr.third.org.redisson.cache.Cache;
import com.fr.third.org.redisson.cache.LFUCacheMap;
import com.fr.third.org.redisson.cache.LRUCacheMap;
import com.fr.third.org.redisson.cache.LocalCachedMapClear;
import com.fr.third.org.redisson.cache.LocalCachedMapInvalidate;
import com.fr.third.org.redisson.cache.LocalCachedMapUpdate;
import com.fr.third.org.redisson.cache.LocalCachedMessageCodec;
import com.fr.third.org.redisson.cache.NoneCacheMap;
import com.fr.third.org.redisson.cache.ReferenceCacheMap;
import com.fr.third.org.redisson.client.codec.ByteArrayCodec;
import com.fr.third.org.redisson.client.codec.Codec;
import com.fr.third.org.redisson.client.codec.LongCodec;
import com.fr.third.org.redisson.client.codec.StringCodec;
import com.fr.third.org.redisson.client.protocol.RedisCommand;
import com.fr.third.org.redisson.client.protocol.RedisCommands;
import com.fr.third.org.redisson.client.protocol.convertor.NumberConvertor;
import com.fr.third.org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import com.fr.third.org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import com.fr.third.org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import com.fr.third.org.redisson.misc.Hash;
import com.fr.third.org.redisson.misc.RPromise;
import com.fr.third.org.redisson.misc.RedissonPromise;
import com.fr.third.org.redisson.api.LocalCachedMapOptions;
import com.fr.third.org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import com.fr.third.org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import com.fr.third.org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import com.fr.third.org.redisson.api.RFuture;
import com.fr.third.org.redisson.api.RLocalCachedMap;
import com.fr.third.org.redisson.api.RScoredSortedSet;
import com.fr.third.org.redisson.api.RTopic;
import com.fr.third.org.redisson.api.RedissonClient;
import com.fr.third.org.redisson.api.listener.BaseStatusListener;
import com.fr.third.org.redisson.api.listener.MessageListener;
import com.fr.third.org.redisson.cache.Cache;
import com.fr.third.org.redisson.cache.LFUCacheMap;
import com.fr.third.org.redisson.cache.LRUCacheMap;
import com.fr.third.org.redisson.cache.LocalCachedMapClear;
import com.fr.third.org.redisson.cache.LocalCachedMapInvalidate;
import com.fr.third.org.redisson.cache.LocalCachedMapUpdate;
import com.fr.third.org.redisson.cache.LocalCachedMessageCodec;
import com.fr.third.org.redisson.cache.NoneCacheMap;
import com.fr.third.org.redisson.cache.ReferenceCacheMap;
import com.fr.third.org.redisson.client.codec.ByteArrayCodec;
import com.fr.third.org.redisson.client.codec.Codec;
import com.fr.third.org.redisson.client.codec.LongCodec;
import com.fr.third.org.redisson.client.codec.StringCodec;
import com.fr.third.org.redisson.client.protocol.RedisCommand;
import com.fr.third.org.redisson.client.protocol.RedisCommand.ValueType;
import com.fr.third.org.redisson.client.protocol.RedisCommands;
import com.fr.third.org.redisson.client.protocol.convertor.NumberConvertor;
import com.fr.third.org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import com.fr.third.org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import com.fr.third.org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import com.fr.third.org.redisson.command.CommandAsyncExecutor;
import com.fr.third.org.redisson.eviction.EvictionScheduler;
import com.fr.third.org.redisson.misc.Hash;
import com.fr.third.org.redisson.misc.RPromise;
import com.fr.third.org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
*/
@SuppressWarnings("serial")
public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements RLocalCachedMap<K, V> {
private static final Logger log = LoggerFactory.getLogger(RedissonLocalCachedMap.class);
public static class CacheKey implements Serializable {
private final byte[] keyHash;
public CacheKey(byte[] keyHash) {
super();
this.keyHash = keyHash;
}
public byte[] getKeyHash() {
return keyHash;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(keyHash);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CacheKey other = (CacheKey) obj;
if (!Arrays.equals(keyHash, other.keyHash))
return false;
return true;
}
@Override
public String toString() {
return "CacheKey [keyHash=" + Arrays.toString(keyHash) + "]";
}
}
public static class CacheValue implements Serializable {
private final Object key;
private final Object value;
public CacheValue(Object key, Object value) {
super();
this.key = key;
this.value = value;
}
public Object getKey() {
return key;
}
public Object getValue() {
return value;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CacheValue other = (CacheValue) obj;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
@Override
public String toString() {
return "CacheValue [key=" + key + ", value=" + value + "]";
}
}
private static final RedisCommand<Set<Object>> ALL_KEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>(), RedisCommand.ValueType.MAP_KEY);
private static final RedisCommand<Set<Entry<Object, Object>>> ALL_ENTRIES = new RedisCommand<Set<Entry<Object, Object>>>("EVAL", new ObjectMapEntryReplayDecoder(), RedisCommand.ValueType.MAP);
private static final RedisCommand<Map<Object, Object>> ALL_MAP = new RedisCommand<Map<Object, Object>>("EVAL", new ObjectMapReplayDecoder(), RedisCommand.ValueType.MAP);
private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10);
private byte[] instanceId;
private RTopic<Object> invalidationTopic;
private Cache<CacheKey, CacheValue> cache;
private int invalidateEntryOnChange;
private int syncListenerId;
private int reconnectionListenerId;
private volatile long lastInvalidate;
private LocalCachedMapOptions.SyncStrategy syncStrategy;
private final Codec topicCodec = new LocalCachedMessageCodec();
protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(commandExecutor, name, redisson, options);
init(name, options, redisson, evictionScheduler);
}
protected RedissonLocalCachedMap(Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(codec, connectionManager, name, redisson, options);
init(name, options, redisson, evictionScheduler);
}
private void init(String name, LocalCachedMapOptions<K, V> options, RedissonClient redisson, EvictionScheduler evictionScheduler) {
instanceId = generateId();
syncStrategy = options.getSyncStrategy();
if (options.getSyncStrategy() != LocalCachedMapOptions.SyncStrategy.NONE) {
invalidateEntryOnChange = 1;
}
if (options.getReconnectionStrategy() == LocalCachedMapOptions.ReconnectionStrategy.LOAD) {
invalidateEntryOnChange = 2;
evictionScheduler.schedule(getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1));
}
cache = createCache(options);
addListeners(name, options, redisson);
}
private void addListeners(String name, final LocalCachedMapOptions<K, V> options, final RedissonClient redisson) {
invalidationTopic = new RedissonTopic<Object>(topicCodec, commandExecutor, suffixName(name, "topic"));
if (options.getReconnectionStrategy() != LocalCachedMapOptions.ReconnectionStrategy.NONE) {
reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
if (options.getReconnectionStrategy() == LocalCachedMapOptions.ReconnectionStrategy.CLEAR) {
cache.clear();
}
if (options.getReconnectionStrategy() == LocalCachedMapOptions.ReconnectionStrategy.LOAD
// check if instance has already been used
&& lastInvalidate > 0) {
if (System.currentTimeMillis() - lastInvalidate > cacheUpdateLogTime) {
cache.clear();
return;
}
isExistsAsync().addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't check existance", future.cause());
return;
}
if (!future.getNow()) {
cache.clear();
return;
}
RScoredSortedSet<byte[]> logs = redisson.getScoredSortedSet(getUpdatesLogName(), ByteArrayCodec.INSTANCE);
logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true)
.addListener(new FutureListener<Collection<byte[]>>() {
@Override
public void operationComplete(Future<Collection<byte[]>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't load update log", future.cause());
return;
}
for (byte[] entry : future.getNow()) {
byte[] keyHash = Arrays.copyOf(entry, 16);
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
});
}
});
}
}
});
}
if (options.getSyncStrategy() != LocalCachedMapOptions.SyncStrategy.NONE) {
syncListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
cache.clear();
}
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
}
if (msg instanceof LocalCachedMapUpdate) {
LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg;
for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) {
ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey());
ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue());
try {
CacheKey cacheKey = toCacheKey(keyBuf);
Object key = codec.getMapKeyDecoder().decode(keyBuf, null);
Object value = codec.getMapValueDecoder().decode(valueBuf, null);
cache.put(cacheKey, new CacheValue(key, value));
} catch (IOException e) {
log.error("Can't decode map entry", e);
} finally {
keyBuf.release();
valueBuf.release();
}
}
}
if (options.getReconnectionStrategy() == LocalCachedMapOptions.ReconnectionStrategy.LOAD) {
lastInvalidate = System.currentTimeMillis();
}
}
});
}
}
protected Cache<CacheKey, CacheValue> createCache(LocalCachedMapOptions<K, V> options) {
if (options.getEvictionPolicy() == LocalCachedMapOptions.EvictionPolicy.NONE) {
return new NoneCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == LocalCachedMapOptions.EvictionPolicy.LRU) {
return new LRUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == LocalCachedMapOptions.EvictionPolicy.LFU) {
return new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == LocalCachedMapOptions.EvictionPolicy.SOFT) {
return ReferenceCacheMap.soft(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == LocalCachedMapOptions.EvictionPolicy.WEAK) {
return ReferenceCacheMap.weak(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
throw new IllegalArgumentException("Invalid eviction policy: " + options.getEvictionPolicy());
}
private CacheKey toCacheKey(Object key) {
ByteBuf encoded = encodeMapKey(key);
try {
return toCacheKey(encoded);
} finally {
encoded.release();
}
}
private CacheKey toCacheKey(ByteBuf encodedKey) {
return new CacheKey(Hash.hash128toArray(encodedKey));
}
@Override
public RFuture<Boolean> containsKeyAsync(Object key) {
checkKey(key);
CacheKey cacheKey = toCacheKey(key);
if (!cache.containsKey(cacheKey)) {
return super.containsKeyAsync(key);
}
return RedissonPromise.newSucceededFuture(true);
}
@Override
public RFuture<Boolean> containsValueAsync(Object value) {
checkValue(value);
CacheValue cacheValue = new CacheValue(null, value);
if (!cache.containsValue(cacheValue)) {
return super.containsValueAsync(value);
}
return RedissonPromise.newSucceededFuture(true);
}
@Override
public RFuture<V> getAsync(final Object key) {
checkKey(key);
final CacheKey cacheKey = toCacheKey(key);
CacheValue cacheValue = cache.get(cacheKey);
if (cacheValue != null && cacheValue.getValue() != null) {
return RedissonPromise.newSucceededFuture((V)cacheValue.getValue());
}
RFuture<V> future = super.getAsync((K)key);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
return;
}
V value = future.getNow();
if (value != null) {
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
String getUpdatesLogName() {
return prefixName("redisson__cache_updates_log", getName());
}
protected static byte[] generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
return id;
}
protected static byte[] generateLogEntryId(byte[] keyHash) {
byte[] result = new byte[keyHash.length + 1 + 8];
result[16] = ':';
byte[] id = new byte[8];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
System.arraycopy(keyHash, 0, result, 0, keyHash.length);
System.arraycopy(id, 0, result, 17, id.length);
return result;
}
@Override
protected RFuture<V> putOperationAsync(K key, V value) {
ByteBuf mapKey = encodeMapKey(key);
ByteBuf mapValue = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(mapKey, mapValue, cacheKey);
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "return v; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
mapKey, mapValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
}
protected ByteBuf createSyncMessage(ByteBuf mapKey, ByteBuf mapValue, CacheKey cacheKey) {
if (syncStrategy == LocalCachedMapOptions.SyncStrategy.UPDATE) {
return encode(new LocalCachedMapUpdate(mapKey, mapValue));
}
return encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
}
@Override
protected RFuture<Boolean> fastPutOperationAsync(K key, V value) {
ByteBuf encodedKey = encodeMapKey(key);
ByteBuf encodedValue = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(encodedKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey);
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "return 0; "
+ "end; "
+ "return 1; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
}
@Override
public void destroy() {
if (syncListenerId != 0) {
invalidationTopic.removeListener(syncListenerId);
}
if (reconnectionListenerId != 0) {
invalidationTopic.removeListener(reconnectionListenerId);
}
}
@Override
public RFuture<V> removeOperationAsync(K key) {
ByteBuf keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
cache.remove(cacheKey);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[2]); "
+ "end; "
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[4], ARGV[5]);"
+ "redis.call('publish', KEYS[2], ARGV[2]); "
+ "end;"
+ "end; "
+ "return v",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyEncoded, msgEncoded, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
}
@Override
protected RFuture<List<Long>> fastRemoveOperationBatchAsync(@SuppressWarnings("unchecked") K... keys) {
if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
for (K k : keys) {
ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for j = 1, #ARGV, 2 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[j]);"
+ "if val == 1 then "
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "end;"
+ "table.insert(result, val);"
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
params.toArray());
}
if (invalidateEntryOnChange == 2) {
List<Object> params = new ArrayList<Object>(keys.length*3);
params.add(System.currentTimeMillis());
for (K k : keys) {
ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
params.add(entryId);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for j = 2, #ARGV, 3 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[j]);"
+ "if val == 1 then "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[j+2]);"
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "end;"
+ "table.insert(result, val);"
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
params.toArray());
}
List<Object> params = new ArrayList<Object>(keys.length);
for (K k : keys) {
ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
}
RFuture<List<Long>> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for i = 1, #ARGV, 1 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[i]); "
+ "table.insert(result, val); "
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName()),
params.toArray());
return future;
}
@Override
protected RFuture<Long> fastRemoveOperationAsync(K ... keys) {
if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
for (K k : keys) {
ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
}
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0; " +
"for j = 1, #ARGV, 2 do "
+ "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then "
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "counter = counter + 1;"
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
params.toArray());
}
if (invalidateEntryOnChange == 2) {
List<Object> params = new ArrayList<Object>(keys.length*3);
params.add(System.currentTimeMillis());
for (K k : keys) {
ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
params.add(entryId);
}
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0; " +
"for j = 2, #ARGV, 3 do "
+ "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[j+2]);"
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "counter = counter + 1;"
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
params.toArray());
}
List<Object> params = new ArrayList<Object>(keys.length + 1);
params.add(getName());
for (K k : keys) {
ByteBuf keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.HDEL, params.toArray());
}
@Override
public RFuture<Boolean> deleteAsync() {
cache.clear();
ByteBuf msgEncoded = encode(new LocalCachedMapClear());
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1], KEYS[3]) > 0 and ARGV[2] ~= '0' then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end; "
+ "return 0;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
msgEncoded, invalidateEntryOnChange);
}
@Override
public RFuture<Map<K, V>> getAllAsync(Set<K> keys) {
if (keys.isEmpty()) {
return RedissonPromise.newSucceededFuture(Collections.<K, V>emptyMap());
}
final Map<K, V> result = new HashMap<K, V>();
Set<K> mapKeys = new HashSet<K>(keys);
for (Iterator<K> iterator = mapKeys.iterator(); iterator.hasNext();) {
K key = iterator.next();
final CacheKey cacheKey = toCacheKey(key);
CacheValue value = cache.get(cacheKey);
if (value != null) {
result.put(key, (V)value.getValue());
iterator.remove();
}
}
final RPromise<Map<K, V>> promise = new RedissonPromise<Map<K, V>>();
RFuture<Map<K, V>> future = super.getAllAsync(mapKeys);
future.addListener(new FutureListener<Map<K, V>>() {
@Override
public void operationComplete(Future<Map<K, V>> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
Map<K, V> map = future.getNow();
result.putAll(map);
cacheMap(map);
promise.trySuccess(result);
}
});
return promise;
}
private void cacheMap(Map<?, ?> map) {
for (java.util.Map.Entry<?, ?> entry : map.entrySet()) {
CacheKey cacheKey = toCacheKey(entry.getKey());
CacheValue cacheValue = new CacheValue(entry.getKey(), entry.getValue());
cache.put(cacheKey, cacheValue);
}
}
@Override
protected RFuture<Void> putAllOperationAsync(final Map<? extends K, ? extends V> map) {
List<Object> params = new ArrayList<Object>(map.size()*3);
params.add(invalidateEntryOnChange);
params.add(map.size()*2);
byte[][] hashes = new byte[map.size()][];
int i = 0;
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
ByteBuf mapKey = encodeMapKey(t.getKey());
ByteBuf mapValue = encodeMapValue(t.getValue());
params.add(mapKey);
params.add(mapValue);
CacheKey cacheKey = toCacheKey(mapKey);
hashes[i] = cacheKey.getKeyHash();
i++;
}
ByteBuf msgEncoded = null;
if (syncStrategy == LocalCachedMapOptions.SyncStrategy.UPDATE) {
List<LocalCachedMapUpdate.Entry> entries = new ArrayList<LocalCachedMapUpdate.Entry>();
for (int j = 2; j < params.size(); j += 2) {
ByteBuf key = (ByteBuf) params.get(j);
ByteBuf value = (ByteBuf) params.get(j+1);
entries.add(new LocalCachedMapUpdate.Entry(key, value));
}
msgEncoded = encode(new LocalCachedMapUpdate(entries));
} else {
msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, hashes));
}
if (invalidateEntryOnChange == 2) {
long time = System.currentTimeMillis();
for (byte[] hash : hashes) {
byte[] entryId = generateLogEntryId(hash);
params.add(time);
params.add(entryId);
}
}
if (msgEncoded != null) {
params.add(msgEncoded);
}
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"for i=3, tonumber(ARGV[2]) + 2, 5000 do "
+ "redis.call('hmset', KEYS[1], unpack(ARGV, i, math.min(i+4999, tonumber(ARGV[2]) + 2))); "
+ "end; "
+ "if ARGV[1] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;"
+ "if ARGV[1] == '2' then "
+ "for i=tonumber(ARGV[2]) + 2 + 1, #ARGV - 1, 5000 do "
+ "redis.call('hmset', KEYS[3], unpack(ARGV, i, math.min(i+4999, #ARGV - 1))); "
+ "end; "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
params.toArray());
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
cacheMap(map);
result.trySuccess(null);
}
});
return result;
}
@Override
public RFuture<V> addAndGetOperationAsync(final K key, Number value) {
final ByteBuf keyState = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyState);
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
RFuture<V> future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())),
"local result = redis.call('HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return result; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
return;
}
V value = future.getNow();
if (value != null) {
CacheKey cacheKey = toCacheKey(key);
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
@Override
public RFuture<Boolean> fastPutIfAbsentAsync(final K key, final V value) {
RFuture<Boolean> future = super.fastPutIfAbsentAsync(key, value);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow()) {
CacheKey cacheKey = toCacheKey(key);
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
@Override
public RFuture<Collection<V>> readAllValuesAsync() {
final List<V> result = new ArrayList<V>();
final List<Object> mapKeys = new ArrayList<Object>();
for (CacheValue value : cache.values()) {
mapKeys.add(encodeMapKey(value.getKey()));
result.add((V) value.getValue());
}
final RPromise<Collection<V>> promise = new RedissonPromise<Collection<V>>();
RFuture<Collection<V>> future = commandExecutor.evalReadAsync(getName(), codec, ALL_KEYS,
"local entries = redis.call('hgetall', KEYS[1]); "
+ "local result = {};"
+ "for j, v in ipairs(entries) do "
+ "if j % 2 ~= 0 then "
+ "local founded = false;"
+ "for i = 1, #ARGV, 1 do "
+ "if ARGV[i] == entries[j] then "
+ "founded = true;"
+ "end;"
+ "end; "
+ "if founded == false then "
+ "table.insert(result, entries[j+1]);"
+ "end;"
+ "end; "
+ "end; "
+ "return result; ",
Arrays.<Object>asList(getName()),
mapKeys.toArray());
future.addListener(new FutureListener<Collection<V>>() {
@Override
public void operationComplete(Future<Collection<V>> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
result.addAll(future.get());
promise.trySuccess(result);
}
});
return promise;
}
@Override
public RFuture<Map<K, V>> readAllMapAsync() {
final Map<K, V> result = new HashMap<K, V>();
List<Object> mapKeys = new ArrayList<Object>();
for (CacheValue value : cache.values()) {
mapKeys.add(encodeMapKey(value.getKey()));
result.put((K)value.getKey(), (V)value.getValue());
}
final RPromise<Map<K, V>> promise = new RedissonPromise<Map<K, V>>();
RFuture<Map<K, V>> future = readAll(ALL_MAP, mapKeys, result);
future.addListener(new FutureListener<Map<K, V>>() {
@Override
public void operationComplete(Future<Map<K, V>> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
for (java.util.Map.Entry<K, V> entry : future.getNow().entrySet()) {
CacheKey cacheKey = toCacheKey(entry.getKey());
cache.put(cacheKey, new CacheValue(entry.getKey(), entry.getValue()));
}
result.putAll(future.getNow());
promise.trySuccess(result);
}
});
return promise;
}
@Override
public void preloadCache() {
// Best-attempt warmup - just enumerate as an uncached map (super) and
// add anything found into the cache. This does not guarantee to find
// entries added during the warmUp, but statistically the cache will have
// few misses after this process
for(Entry<K,V> entry : super.entrySet()) {
CacheKey cacheKey = toCacheKey(entry.getKey());
cache.put(cacheKey, new CacheValue(entry.getKey(), entry.getValue()));
}
}
@Override
public void clearLocalCache() {
get(clearLocalCacheAsync());
}
@Override
public RFuture<Void> clearLocalCacheAsync() {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Long> future = invalidationTopic.publishAsync(new LocalCachedMapClear());
future.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
result.trySuccess(null);
}
});
return result;
}
@Override
public RFuture<Set<Entry<K, V>>> readAllEntrySetAsync() {
final Set<Entry<K, V>> result = new HashSet<Entry<K, V>>();
List<Object> mapKeys = new ArrayList<Object>();
for (CacheValue value : cache.values()) {
mapKeys.add(encodeMapKey(value.getKey()));
result.add(new AbstractMap.SimpleEntry<K, V>((K)value.getKey(), (V)value.getValue()));
}
final RPromise<Set<Entry<K, V>>> promise = new RedissonPromise<Set<Entry<K, V>>>();
RFuture<Set<Entry<K, V>>> future = readAll(ALL_ENTRIES, mapKeys, result);
future.addListener(new FutureListener<Set<Entry<K, V>>>() {
@Override
public void operationComplete(Future<Set<Entry<K, V>>> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
for (java.util.Map.Entry<K, V> entry : future.getNow()) {
CacheKey cacheKey = toCacheKey(entry.getKey());
cache.put(cacheKey, new CacheValue(entry.getKey(), entry.getValue()));
}
result.addAll(future.getNow());
promise.trySuccess(result);
}
});
return promise;
}
private <R> RFuture<R> readAll(RedisCommand<?> evalCommandType, List<Object> mapKeys, R result) {
return commandExecutor.evalReadAsync(getName(), codec, evalCommandType,
"local entries = redis.call('hgetall', KEYS[1]); "
+ "local result = {};"
+ "for j, v in ipairs(entries) do "
+ "if j % 2 ~= 0 then "
+ "local founded = false;"
+ "for i = 1, #ARGV, 1 do "
+ "if ARGV[i] == entries[j] then "
+ "founded = true;"
+ "end;"
+ "end; "
+ "if founded == false then "
+ "table.insert(result, entries[j]);"
+ "table.insert(result, entries[j+1]);"
+ "end;"
+ "end; "
+ "end; "
+ "return result; ",
Arrays.<Object>asList(getName()),
mapKeys.toArray());
}
@Override
protected RFuture<V> replaceOperationAsync(K key, V value) {
final ByteBuf keyState = encodeMapKey(key);
ByteBuf valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return v; "
+ "else "
+ "return nil; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@Override
public RFuture<V> replaceAsync(final K key, final V value) {
RFuture<V> future = super.replaceAsync(key, value);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow() != null) {
CacheKey cacheKey = toCacheKey(key);
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
@Override
protected RFuture<Boolean> replaceOperationAsync(K key, V oldValue, V newValue) {
final ByteBuf keyState = encodeMapKey(key);
ByteBuf oldValueState = encodeMapValue(oldValue);
ByteBuf newValueState = encodeMapValue(newValue);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = createSyncMessage(keyState, newValueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[5]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[6], ARGV[7]);"
+ "redis.call('publish', KEYS[2], ARGV[5]); "
+ "end;"
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, oldValueState, newValueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@Override
public RFuture<Boolean> replaceAsync(final K key, V oldValue, final V newValue) {
final CacheKey cacheKey = toCacheKey(key);
RFuture<Boolean> future = super.replaceAsync(key, oldValue, newValue);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow()) {
cache.put(cacheKey, new CacheValue(key, newValue));
}
}
});
return future;
}
@Override
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
final ByteBuf keyState = encodeMapKey(key);
ByteBuf valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@Override
public RFuture<Boolean> removeAsync(Object key, Object value) {
final CacheKey cacheKey = toCacheKey(key);
RFuture<Boolean> future = super.removeAsync(key, value);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow()) {
cache.remove(cacheKey);
}
}
});
return future;
}
@Override
public RFuture<V> putIfAbsentAsync(final K key, final V value) {
RFuture<V> future = super.putIfAbsentAsync(key, value);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow() == null) {
CacheKey cacheKey = toCacheKey(key);
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
@Override
public ByteBuf encode(Object value) {
try {
return topicCodec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}