Redisson 的代码质量相当的不错,从抽象设计 代码的思路 和 顺序 读别人源码都非常的清晰,不想Spring Cloud 的eureka 和 hystrix 读起来 相当的混乱。
可重入锁对吧,就是一个线程加锁两次,那么redisson 又是如何实现的呢,其实这里还是要去看lua脚本
lock()加锁的逻辑是一样的,最本质的区别再与lua脚本
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
} if (redis.call('exists', KEYS[1]) == 0)
这段脚本就是第一次上锁逻辑,就直接跳过了,前面有讲
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
这段脚本,什么意思?意思是如果key已经存在,其实就是代表已经被一个线程上锁了,再去判断key 对应的map数据结构中是否存在 线程唯一标识(UUID+":" + ThreadId )作为map中的key是否存在,别忘了锁对应在redis中的数据结构
lockName
{
“0d39099896e6400d9f97127cbae18090:1” :1
}
(1)如果根据key 和 当前线程 存在,也就代表持有锁的线程就是当前线程
(2)如果没有存在,说明当前线程并不是持有锁的线程,会走下面的逻辑return redis.call('pttl', KEYS[1]) 返回锁剩余的过期时间
这里我们再看看可重入锁的脚本
redis.call('hincrby', KEYS[1], ARGV[2], 1);
这段脚本的意思就是将对应的锁结构中的的线程标志的值 +1,那么其实这里的值就代表,重入的次数
redis.call('pexpire', KEYS[1], ARGV[1]);
将过期时间重新设置,默认是30S
以上就是可重入的逻辑。
那么我们再来分析下多个线程或者多台机器上的互斥阻塞问题
我们需要回到前面的代码逻辑
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
//如果ttl为null 我们知道是加锁成功了 就直接返回
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
//加锁失败,就会再这里,不停的循环尝试加锁 死循环
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
//这里是什么意思?这里会涉及到其他的同步组件
//比如Semaphore 信号量等
//所以这里我们先知道这里 肯定会等待一段时间,然后重新尝试获取锁
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
} 其实这里我们就可以看到,如果一个线程尝试获取锁失败,然后就会进入一个死循环,然后不停的等待尝试获取锁,那么现在是要知道如何感知要去尝试获取锁这个操作对吧。后面我们会看到,这里简单说一下 是基于redis 的发布订阅 channel 来实现的。






还没有评论,来说两句吧...