/** * Copyright 2018 Nikita Koksharov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.fr.third.org.redisson; import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import com.fr.third.org.redisson.api.RFuture; import com.fr.third.org.redisson.api.RLock; import com.fr.third.org.redisson.client.codec.LongCodec; import com.fr.third.org.redisson.client.codec.StringCodec; import com.fr.third.org.redisson.client.protocol.RedisCommands; import com.fr.third.org.redisson.client.protocol.RedisStrictCommand; import com.fr.third.org.redisson.pubsub.LockPubSub; import com.fr.third.org.redisson.api.RFuture; import com.fr.third.org.redisson.api.RLock; import com.fr.third.org.redisson.client.codec.LongCodec; import com.fr.third.org.redisson.client.codec.StringCodec; import com.fr.third.org.redisson.client.protocol.RedisCommands; import com.fr.third.org.redisson.client.protocol.RedisStrictCommand; import com.fr.third.org.redisson.command.CommandAsyncExecutor; import com.fr.third.org.redisson.pubsub.LockPubSub; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; /** * Lock will be removed automatically if client disconnects. * * @author Nikita Koksharov * */ public class RedissonReadLock extends RedissonLock implements RLock { public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } @Override String getChannelName() { return prefixName("redisson_rwlock", getName()); } String getWriteLockName(long threadId) { return super.getLockName(threadId) + ":write"; } String getReadWriteTimeoutNamePrefix(long threadId) { return suffixName(getName(), getLockName(threadId)) + ":rwlock_timeout"; } @Override RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('set', KEYS[2] .. ':1', 1); " + "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" + "redis.call('set', key, 1); " + "redis.call('pexpire', key, ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end;" + "return redis.call('pttl', KEYS[1]);", Arrays.asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)); } @Override protected RFuture unlockInnerAsync(long threadId) { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); String keyPrefix = timeoutPrefix.split(":" + getLockName(threadId))[0]; return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " + "if (lockExists == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter == 0) then " + "redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" + "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " + "if (redis.call('hlen', KEYS[1]) > 1) then " + "local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + "end; " + "if maxRemainTime > 0 then " + "redis.call('pexpire', KEYS[1], maxRemainTime); " + "return 0; " + "end;" + "if mode == 'write' then " + "return 0;" + "end; " + "end; " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; ", Arrays.asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.unlockMessage, getLockName(threadId)); } @Override public Condition newCondition() { throw new UnsupportedOperationException(); } @Override public RFuture forceUnlockAsync() { RFuture result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hget', KEYS[1], 'mode') == 'read') then " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return 0; ", Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage); result.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess() && future.getNow()) { cancelExpirationRenewal(); } } }); return result; } @Override public boolean isLocked() { RFuture future = commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); String res = get(future); return "read".equals(res); } }