/** * Copyright 2018 Nikita Koksharov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.fr.third.org.redisson; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import com.fr.third.org.redisson.api.RFuture; import com.fr.third.org.redisson.api.RKeys; import com.fr.third.org.redisson.api.RObject; import com.fr.third.org.redisson.api.RType; import com.fr.third.org.redisson.client.RedisClient; import com.fr.third.org.redisson.client.RedisException; import com.fr.third.org.redisson.client.codec.ScanCodec; import com.fr.third.org.redisson.client.codec.StringCodec; import com.fr.third.org.redisson.client.protocol.RedisCommands; import com.fr.third.org.redisson.client.protocol.RedisStrictCommand; import com.fr.third.org.redisson.client.protocol.decoder.ListScanResult; import com.fr.third.org.redisson.client.protocol.decoder.ScanObjectEntry; import com.fr.third.org.redisson.misc.CompositeIterable; import com.fr.third.org.redisson.misc.RPromise; import com.fr.third.org.redisson.misc.RedissonPromise; import com.fr.third.org.redisson.api.RFuture; import com.fr.third.org.redisson.api.RKeys; import com.fr.third.org.redisson.api.RObject; import com.fr.third.org.redisson.api.RType; import com.fr.third.org.redisson.client.RedisClient; import com.fr.third.org.redisson.client.RedisException; import com.fr.third.org.redisson.client.codec.ScanCodec; import com.fr.third.org.redisson.client.codec.StringCodec; import com.fr.third.org.redisson.client.protocol.RedisCommands; import com.fr.third.org.redisson.client.protocol.RedisStrictCommand; import com.fr.third.org.redisson.client.protocol.decoder.ListScanResult; import com.fr.third.org.redisson.client.protocol.decoder.ScanObjectEntry; import com.fr.third.org.redisson.command.CommandAsyncExecutor; import com.fr.third.org.redisson.command.CommandBatchService; import com.fr.third.org.redisson.connection.MasterSlaveEntry; import com.fr.third.org.redisson.misc.CompositeIterable; import com.fr.third.org.redisson.misc.RPromise; import com.fr.third.org.redisson.misc.RedissonPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; /** * * @author Nikita Koksharov * */ public class RedissonKeys implements RKeys { private final CommandAsyncExecutor commandExecutor; public RedissonKeys(CommandAsyncExecutor commandExecutor) { super(); this.commandExecutor = commandExecutor; } @Override public RType getType(String key) { return commandExecutor.get(getTypeAsync(key)); } @Override public RFuture getTypeAsync(String key) { return commandExecutor.readAsync(key, RedisCommands.TYPE, key); } @Override public int getSlot(String key) { return commandExecutor.get(getSlotAsync(key)); } @Override public RFuture getSlotAsync(String key) { return commandExecutor.readAsync(null, RedisCommands.KEYSLOT, key); } @Override public Iterable getKeysByPattern(String pattern) { return getKeysByPattern(pattern, 10); } public Iterable getKeysByPattern(final String pattern, final int count) { List> iterables = new ArrayList>(); for (final MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { Iterable iterable = new Iterable() { @Override public Iterator iterator() { return createKeysIterator(entry, pattern, count); } }; iterables.add(iterable); } return new CompositeIterable(iterables); } @Override public Iterable getKeys() { return getKeysByPattern(null); } private ListScanResult scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) { if (pattern == null) { RFuture> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count); return commandExecutor.get(f); } RFuture> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); return commandExecutor.get(f); } private Iterator createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) { return new RedissonBaseIterator() { @Override ListScanResult iterator(RedisClient client, long nextIterPos) { return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count); } @Override void remove(String value) { RedissonKeys.this.delete(value); } }; } @Override public long touch(String... names) { return commandExecutor.get(touchAsync(names)); } @Override public RFuture touchAsync(String... names) { return commandExecutor.writeAllAsync(RedisCommands.TOUCH_LONG, new SlotCallback() { AtomicLong results = new AtomicLong(); @Override public void onSlotResult(Long result) { results.addAndGet(result); } @Override public Long onFinish() { return results.get(); } }, names); } @Override public long countExists(String... names) { return commandExecutor.get(countExistsAsync(names)); } @Override public RFuture countExistsAsync(String... names) { return commandExecutor.readAllAsync(RedisCommands.EXISTS_LONG, new SlotCallback() { AtomicLong results = new AtomicLong(); @Override public void onSlotResult(Long result) { results.addAndGet(result); } @Override public Long onFinish() { return results.get(); } }, names); } @Override public String randomKey() { return commandExecutor.get(randomKeyAsync()); } @Override public RFuture randomKeyAsync() { return commandExecutor.readRandomAsync(RedisCommands.RANDOM_KEY); } @Override public Collection findKeysByPattern(String pattern) { return commandExecutor.get(findKeysByPatternAsync(pattern)); } @Override public RFuture> findKeysByPatternAsync(String pattern) { return commandExecutor.readAllAsync(RedisCommands.KEYS, pattern); } @Override public long deleteByPattern(String pattern) { return commandExecutor.get(deleteByPatternAsync(pattern)); } @Override public RFuture deleteByPatternAsync(final String pattern) { final int batchSize = 100; final RPromise result = new RedissonPromise(); final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); Collection entries = commandExecutor.getConnectionManager().getEntrySet(); final AtomicLong executed = new AtomicLong(entries.size()); final FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { count.addAndGet(future.getNow()); } else { failed.set(future.cause()); } checkExecution(result, failed, count, executed); } }; for (final MasterSlaveEntry entry : entries) { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { @Override public void run() { long count = 0; try { Iterator keysIterator = createKeysIterator(entry, pattern, batchSize); List keys = new ArrayList(); while (keysIterator.hasNext()) { String key = keysIterator.next(); keys.add(key); if (keys.size() % batchSize == 0) { count += delete(keys.toArray(new String[keys.size()])); keys.clear(); } } if (!keys.isEmpty()) { count += delete(keys.toArray(new String[keys.size()])); keys.clear(); } Future future = ImmediateEventExecutor.INSTANCE.newSucceededFuture(count); future.addListener(listener); } catch (Exception e) { Future future = ImmediateEventExecutor.INSTANCE.newFailedFuture(e); future.addListener(listener); } } }); } return result; } @Override public long delete(String ... keys) { return commandExecutor.get(deleteAsync(keys)); } @Override public long delete(RObject... objects) { return commandExecutor.get(deleteAsync(objects)); } @Override public RFuture deleteAsync(RObject ... objects) { List keys = new ArrayList(); for (RObject obj : objects) { keys.add(obj.getName()); } return deleteAsync(keys.toArray(new String[keys.size()])); } @Override public long unlink(String ... keys) { return commandExecutor.get(deleteAsync(keys)); } @Override public RFuture unlinkAsync(String ... keys) { return executeAsync(RedisCommands.UNLINK, keys); } @Override public RFuture deleteAsync(String ... keys) { return executeAsync(RedisCommands.DEL, keys); } private RFuture executeAsync(RedisStrictCommand command, String ... keys) { if (!commandExecutor.getConnectionManager().isClusterMode()) { return commandExecutor.writeAsync(null, command, keys); } Map> range2key = new HashMap>(); for (String key : keys) { int slot = commandExecutor.getConnectionManager().calcSlot(key); MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(slot); List list = range2key.get(entry); if (list == null) { list = new ArrayList(); range2key.put(entry, list); } list.add(key); } final RPromise result = new RedissonPromise(); final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); final AtomicLong executed = new AtomicLong(range2key.size()); FutureListener> listener = new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { if (future.isSuccess()) { List result = (List) future.get(); for (Long res : result) { if (res != null) { count.addAndGet(res); } } } else { failed.set(future.cause()); } checkExecution(result, failed, count, executed); } }; for (Entry> entry : range2key.entrySet()) { // executes in batch due to CROSSLOT error CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); for (String key : entry.getValue()) { executorService.writeAsync(entry.getKey(), null, command, key); } RFuture> future = executorService.executeAsync(); future.addListener(listener); } return result; } @Override public long count() { return commandExecutor.get(countAsync()); } @Override public RFuture countAsync() { return commandExecutor.readAllAsync(RedisCommands.DBSIZE, new SlotCallback() { AtomicLong results = new AtomicLong(); @Override public void onSlotResult(Long result) { results.addAndGet(result); } @Override public Long onFinish() { return results.get(); } }); } @Override public void flushdbParallel() { commandExecutor.get(flushdbParallelAsync()); } @Override public RFuture flushdbParallelAsync() { return commandExecutor.writeAllAsync(RedisCommands.FLUSHDB_ASYNC); } @Override public void flushallParallel() { commandExecutor.get(flushallParallelAsync()); } @Override public RFuture flushallParallelAsync() { return commandExecutor.writeAllAsync(RedisCommands.FLUSHALL_ASYNC); } @Override public void flushdb() { commandExecutor.get(flushdbAsync()); } @Override public RFuture flushdbAsync() { return commandExecutor.writeAllAsync(RedisCommands.FLUSHDB); } @Override public void flushall() { commandExecutor.get(flushallAsync()); } @Override public RFuture flushallAsync() { return commandExecutor.writeAllAsync(RedisCommands.FLUSHALL); } private void checkExecution(final RPromise result, final AtomicReference failed, final AtomicLong count, final AtomicLong executed) { if (executed.decrementAndGet() == 0) { if (failed.get() != null) { if (count.get() > 0) { RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get()); result.tryFailure(ex); } else { result.tryFailure(failed.get()); } } else { result.trySuccess(count.get()); } } } @Override public long remainTimeToLive(String name) { return commandExecutor.get(remainTimeToLiveAsync(name)); } @Override public RFuture remainTimeToLiveAsync(String name) { return commandExecutor.readAsync(name, StringCodec.INSTANCE, RedisCommands.PTTL, name); } @Override public void rename(String currentName, String newName) { commandExecutor.get(renameAsync(currentName, newName)); } @Override public RFuture renameAsync(String currentName, String newName) { return commandExecutor.writeAsync(currentName, RedisCommands.RENAME, currentName, newName); } @Override public boolean renamenx(String oldName, String newName) { return commandExecutor.get(renamenxAsync(oldName, newName)); } @Override public RFuture renamenxAsync(String oldName, String newName) { return commandExecutor.writeAsync(oldName, RedisCommands.RENAMENX, oldName, newName); } @Override public boolean clearExpire(String name) { return commandExecutor.get(clearExpireAsync(name)); } @Override public RFuture clearExpireAsync(String name) { return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PERSIST, name); } @Override public boolean expireAt(String name, long timestamp) { return commandExecutor.get(expireAtAsync(name, timestamp)); } @Override public RFuture expireAtAsync(String name, long timestamp) { return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIREAT, name, timestamp); } @Override public boolean expire(String name, long timeToLive, TimeUnit timeUnit) { return commandExecutor.get(expireAsync(name, timeToLive, timeUnit)); } @Override public RFuture expireAsync(String name, long timeToLive, TimeUnit timeUnit) { return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIRE, name, timeUnit.toMillis(timeToLive)); } @Override public void migrate(String name, String host, int port, int database) { commandExecutor.get(migrateAsync(name, host, port, database)); } @Override public RFuture migrateAsync(String name, String host, int port, int database) { return commandExecutor.writeAsync(name, RedisCommands.MIGRATE, host, port, name, database); } @Override public boolean move(String name, int database) { return commandExecutor.get(moveAsync(name, database)); } @Override public RFuture moveAsync(String name, int database) { return commandExecutor.writeAsync(name, RedisCommands.MOVE, name, database); } }