读写锁得概念我们就不再赘述了
public static void main(String[] args) throws Exception {
//构建一个配置信息对象
Config config = new Config();
config.useClusterServers()
//定时扫描连接信息 默认1000ms
.setScanInterval(2000)
.addNodeAddress("redis://127.0.0.1:7001");
//因为Redisson 是基于redis封装的一套便于复杂操作的框架
//所以这里构建对象肯定是创建一些与redis的连接
RedissonClient redisson = Redisson.create(config);
ReadWriteLock readWriteLock = redisson.getReadWriteLock("lock");
readWriteLock.readLock().lock();
readWriteLock.writeLock().lock();
//释放锁
readWriteLock.readLock().unlock();
readWriteLock.writeLock().unlock();
}
我们先来看看读锁,RedissonReadLock ,其实我们大概现在都知道了,加锁得逻辑最主要还是去理解lua脚本,RedissonReadLock 覆盖了一些主要得方法,我们主要研究 加锁 释放锁 以及renew续约的lua脚本
我们直接来看RedissonReadLock 加锁的lua脚本
还是研究参数代表的意义
KEY[1] = 锁的名称
KEY[2] = getReadWriteTimeoutNamePrefix(threadId) {lockName};UUID_01:ThreadId_01 : rwlock_timeout
ARGV[1] = internalLockLeaseTime
ARGV[2] = getLockName(threadId) UUID_01:threadId
ARGV[3] = getWriteLockName(threadId) UUID_01:ThreadId:write
读锁的lua脚本
最简单的加锁逻辑是非常的清晰的,但是我们肯定是要分析复杂的逻辑对吧,比如可重入性
(1)连续加2次读锁
(2)连续加2次写锁
(3)先读锁,后写锁
(4)先写锁,后读锁
这几种情况是如何运转的? 也就是研究锁的可重入性
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//hget key mode 就是代表从key对应的map数据结构中获取mode对应的值
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
//设置模式为读模式
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
//设置读锁持有的线程名称
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//两个点是什么意思? 就是字符串拼接,将KEY[2]:1 生成新的字符串
"redis.call('set', KEYS[2] .. ':1', 1); " +
//设置KEY[1] KEY[2]的过期时间
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//锁的可重入性逻辑脚本 ,不管之前是读还是写都将其 +1
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
//将对应的值 +1
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //设置 该次加锁对应timeout key
//大概长这个样子{lockName}:UUID_01:threadId_01:rwLock_timeout:2 1
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
} lockName{
“mode ”:“read”,
“UUID:ThreadId_01”:1
}
{lockName}:UUID_01:threadId_01:rwLock_timeout:1 1
读锁就加锁成功,那么我们还记得有一个watchdog的概念,对吧,就是每隔一段时间去更新锁key的过期时间,RedissonReadLock 对该方法进行了重写 renewExpirationAsync 方法,那么读锁的watchdog 肯定和普通锁不一样 对吧,我们看下lua 脚本的运转
KEY[1] = LockName
KEY[2] = keyPrefix = {lockName}
ARGV[1] = 3000ms
ARGV[2] = UUID_01:threadId_01
@Override
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//获取一下当前这个线程对这个锁 加了一个读锁
"local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
//如果 不等于 false,就是已经上了读锁
"if (counter ~= false) then " +
//就会对其过期时间进行更新
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//hlen lockName > 1 意思就是kv对超过了一个,
"if (redis.call('hlen', KEYS[1]) > 1) then " +
//获取lockName 的所有keys
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
//获取每个key的值 其实就是获取每个线程的重入锁的次数
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
//如果counter 是数字类型
"if type(counter) == 'number' then " +
//就会遍历将counter 从 5 4 3 2 1 遍历
"for i=counter, 1, -1 do " +
//pexpire {lockName}:UUID_01:ThreadId_01:rwlock_timeout:1 3000 意思就是将该key 过期时间设置为3000毫秒
"redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
"end; " +
"end; " +
"end; " +
"end; " +
"return 1; " +
"end; " +
"return 0;",
Arrays.<Object>asList(getName(), keyPrefix),
internalLockLeaseTime, getLockName(threadId));
} 其实lua脚本的意思就是。watchdog 这个线程对把,会将对应的lockName 的key 的过期时间刷新,同时也会对counter 遍历。这个咋说呢,就是将对应那个key的每次加锁对应的一个timeout key ({lockName}:UUID_01:threadId_01:rwLock_timeout:1)的生命周期也刷新一次
加写锁是什么逻辑?RedissonWriteLock其实也是RedissonLock 的子类,我们来看下最基本的逻辑
KEY[1] = LockName
ARGV[1] = 3000ms
ARGV[2] = UUID_01:threadId_01:write
lockName{
“mode”:"write"
"UUID_01:ThreadId_01":3000
}
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
//设置模式类型为write
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
//设置当前线程对应的标志位
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果之前是写锁就执行脚本,如果之前是读锁,那么直接互斥
//什么意思?也就是同一个客户端的同一个线程先读锁后写锁是互斥的
"if (mode == 'write') then " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local currentExpire = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()),
internalLockLeaseTime, getLockName(threadId));
} 那么看了简单的源码 我们能得出几个关键的东西
(1)同一个客户端先读锁 后 写锁是不支持的,是互斥的
(2)写锁的模式中只会操作map数据结构,而读锁的数据结构 会对应的生成一个timeout key 来对应该次的加锁
(3) 同一客户端同一线程 支持先写后读的重入,注意是同一客户端的同一个线程,不同线程是不支持的
lockName{
"mode":"write"
"UUID:ThreadId:write":1
"UUID:ThreadId":1
}
{lockName}:UUID:ThreadId:rwlock_timeout:1 1 3000ms 过期
可以看出来,读写锁也是一种可重入锁, 同类型的重入比较简单,读写重入存在互斥,也存在允许的情况,对应的值就是重入的次数。
但是大家有没发现一个问题,如果是读锁,其他的客户端 线程能不停的往里面加入,但是此时来了一个写锁,就会无法执行,只有等待,但是这里却没与一个机制去保证写锁之后的读锁获取的屏障,所以 有可能造成写锁的饥饿
释放锁机制
释放读锁
KEY[1] = LockName
KEY[2] = redisson_rwlock:{lockName}
KEY[3] = {lockName}:UUID:ThreadId:rwlock_timeout
KEY[4] = {lockName}
ARGV[1] = 0
ARGV[2] = UUID:ThreadId:01
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//获取加速的mode 有可能是write 也有可能是read
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
//不存在直接返回
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
//获取当前线程持有的锁信息
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
//不存在直接返回
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
//获取当前线程 重入的次数 - 1
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
//如果为0 代表已经完全可以释放锁,直接删除对应的map中的数据
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
//删除对应的那次 timeout key
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
//如果map 数据结构中的key超过1,什么意思?就是代表还有线程持有锁
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
//循环遍历keys,如果value 是数字类型,那么就获取对应的timeout key 的过期时间,保存一个最大的剩余过期时间
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
//设置 lockName key的过期时间
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
//如果是写锁模式 直接返回
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
//这是什么情况能到达呢,就是keys=1 就是代表没有客户端持有锁
//就将lockName对应的锁的key 删除
"redis.call('del', KEYS[1]); " +
//同时发布一个消息,通知其他客户端
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.unlockMessage, getLockName(threadId));
} 释放写锁
大概逻辑就是
(1)先判断当前线程是否是持有锁
(2)将当前线程的对应值 -1
(3)-1之后 为0,代表当前线程可以释放锁,将map数据结构中的该线程删除
(4)删除那次生成的timeout key
(5)判断锁 map结构长度是否大于1,大于1代表还有其他线程持有锁,获取timeout key的最大过期时间,并将key 更新ttl,如果只剩write的线程,那么就不会修改任何值
(6)如果keys 长度小于1,代表没有任何客户端以及线程持有锁,那么就会将锁对应的key进行删除
释放写锁
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//获取mode
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果是写锁
"if (mode == 'write') then " +
//获取当前线程持有锁的标志位是否存在
"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
"if (lockExists == 0) then " +
"return nil;" +
"else " +
//将标志位的value 值-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果 -1 后大于0,将 lock key 重新设置过期时间
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//如果=0 代表应该释放锁,将对应值删除
"redis.call('hdel', KEYS[1], ARGV[3]); " +
//判断lock map数据结构中的长度是否为1
//什么意思?就是判断是否还有重入的读锁
//如果没有就删除lock key,同时发送消息
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"else " +
//将mode 模式切换为read 模式
// has unlocked read-locks
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"end; " +
"return 1; "+
"end; " +
"end; " +
"end; "
+ "return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
} 写锁释放的逻辑基本来说都比较类似,比较不一样的地方还是,如果是先写后读的重入,如果先释放写锁,那么mode 就会被更改为read模式






还没有评论,来说两句吧...