MutiLock 说明东西?就是可以将多个锁合并成一个大锁,对一个大锁进行统一的申请和释放。其实就是一次性的去锁定多个资源,然后处理业务,最后统一释放
我们看源码之前先思考一下,这个如果基于之前的一些思想,如何去实现这个功能????难道是就是对多个所RedissonLock 依次去加锁?所有的锁加锁成功就代表MultiLock加锁成功?
我们还是先看看Redisson如何使用,也就是我们看源码的入口对把
public static void main(String[] args) throws Exception {
//构建一个配置信息对象
Config config = new Config();
config.useClusterServers()
//定时扫描连接信息 默认1000ms
.setScanInterval(2000)
.addNodeAddress("redis://127.0.0.1:7001");
//因为Redisson 是基于redis封装的一套便于复杂操作的框架
//所以这里构建对象肯定是创建一些与redis的连接
RedissonClient redisson = Redisson.create(config);
RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RedissonMultiLock multiLock = new RedissonMultiLock(lock1,lock2);
multiLock.lock();
//释放锁
multiLock.unlock();
} 感觉就是去将多个RLock锁实例进行统一的操作管理?
我们看看实例化MultiLock传入的参数干嘛了,将其存入到了一个list 数组中,记住名字 locks
final List<RLock> locks = new ArrayList<RLock>();
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
}
this.locks.addAll(Arrays.asList(locks));
} 进入到RedissonMultiLock 内部,现在基本上和之前差不多的代码
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} @Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
/基础等待时间 这里我们按照3个lock 来统一计算 baseWaitTime = 4500MS
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
//这里还是支持一些获取锁超时机制
//计算一些等待时间
if (leaseTime == -1) {
waitTime = baseWaitTime;
unit = TimeUnit.MILLISECONDS;
} else {
waitTime = unit.toMillis(leaseTime);
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS);
}
while (true) {
//尝试获取锁 死循环,不停的尝试获取所有需要的锁,
//只要没有全部获取到
//或者未超时那么就会一直死循环
if (tryLock(waitTime, leaseTime, unit)) {
return;
}
}
} 其实我们大概看了一下,MultiLock 这个东西最主要的逻辑不是lua脚本来实现,其实是一个 high level 高层次的API,基于low level 低层次的API 进行了封装
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
//设置remainTime 这个是个什么参数?猜一下
remainTime = unit.toMillis(waitTime);
}
//上锁等待时间
long lockWaitTime = calcLockWaitTime(remainTime);
int failedLocksLimit = failedLocksLimit();
//保存已经获取到的锁
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
//这里底层获取锁
//这里使用的还是RedissonLock.tryLock(awaitTime, newLeaseTime。。。)方法,这个方法别忘记 4500毫秒必须获取到锁,如果获取不到就标记为失败,然后会启动一个watchdog 不停的去锁对应key的 过期时间,除非你传入了一个leaseTime 时间,就不会启动一个watchdog
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
//计算剩余时间,什么时间?就是去获取大锁的允许的时间长
//这块逻辑是意思?就是看你尝试获取多个锁的时间 是否超过了设置的时间
//最多不能超过假设4500ms
//超过了那么就会释放掉所有的锁,同时返回false,获取MultiLock 失败
//然后返回false 之后,会返回重新进入该方法,再来尝试4500ms中获取到multiLock
if (remainTime != -1) {
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
} 加锁其实到这已经结束了
那么如何释放锁?其实肯定也是调用底层原本RedissonLock 的API,最后肯定执行的是lua脚本对吧,这里我们就不详细讲了,看下源码打架都能理解
@Override
public void unlock() {
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());
//依次调用底层的RedissonLock方法异步释放锁
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
//异步转同步
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
} @Override
public RFuture<Void> unlockAsync() {
long threadId = Thread.currentThread().getId();
return unlockAsync(threadId);
} @Override
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
本文标题:Redisson源码初探(七) MutiLock
本文链接:https://blog.quwenai.cn/post/2888.html
版权声明:本文不使用任何协议授权,您可以任何形式自由转载或使用。






还没有评论,来说两句吧...