Redisson的基本使用
pom 依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.8.1</version>
</dependency> public static void main(String[] args) throws Exception {
//构建一个配置信息对象
Config config = new Config();
config.useClusterServers()
//定时扫描连接信息 默认1000ms
.setScanInterval(2000)
.addNodeAddress("redis://127.0.0.1:7001");
//因为Redisson 是基于redis封装的一套便于复杂操作的框架
//所以这里构建对象肯定是创建一些与redis的连接
RedissonClient redisson = Redisson.create(config);
//这里是重点 获取锁,这也是重点分析的地方
RLock lock = redisson.getLock("lock");
//加锁
lock.lock();
//释放锁
lock.unlock();
} 通过Config对象去构建RedissonClient对象,可以大致看看,里面其实就是初始化一些配置 信息相关,以及与redis建立连接
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
} @Override
public RLock getLock(String name) {
//构建一个Lock对象,并传入了redis连接执行器(其实可以就理解为二次封装连接,里面会持有一个redis 连接),以及锁名称
return new RedissonLock(connectionManager.getCommandExecutor(), name);
} public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
//调用父类的构造方法 里面没有重要的操作,加入了一个序列化codec,设置了一下树形
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
//看门狗watchdog 的租约时间,这个比较重要,因为设计到lock 的续约 与 故障的自动释放锁
//这里面有一个比较好的设计思想
//默认30S
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
} 那么上面代码重要的操作是什么?尝试获取锁对象,根据锁名称,redis连接,以及一些其他配置构建一个RedissonLock对象,然后返回,当然别忘了比较重要的watchdog 看门狗
接下来就是lock()方法加锁,
@Override
public void lock() {
try {
//进行加锁
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} @Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
} 默认lock方法的leaseTime 为 -1,什么意思?就是只要加锁,就永久性的持有锁,除非宕机,watchdog是不是会发现,然后释放锁? 避免永久性的锁?这里聪明的你肯定也知道这个地方在生产上肯定用的比较少,我们一般还是会使用一个超时自动释放的锁
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//获取当前线程的ID,有什么用?注意可重入,如果不知道哪个线程,怎么重入?
long threadId = Thread.currentThread().getId();
//尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
//异步订阅线程ID
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
//重复尝试获取
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
} private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//如果不是永久性持有
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//根据配置获取watchDog 时间 默认30S 的 leaseTime 这里说明其实就算不设置,默认最长也持有30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
} 关键地方到了,lua 脚本 对吧 ,因为这个地方是加锁的核心之一。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//先判断指定key 是否已经设置
"if (redis.call('exists', KEYS[1]) == 0) then " +
//设置key 以及 对应的value,value是什么? 什么意思
//hset 是对于redis map数据结构
//其实就是针对key map 中的某个数据 设置为1
// ARGV[2] 到底是个什么东西?
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//过期时间设置 默认30S
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果已经存在了,再次使用当前key 以及 唯一标识 看是不是当前线程 持有
//进行可重入锁判断使用
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 对应的value +1 什么意思?就是可重入锁,进入了多少次
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//再次设置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//这个是什么意思?pttl 就是返回当前key 还有多久的存活时间
"return redis.call('pttl', KEYS[1]);",
//internalLockLeaseTime, getLockName(threadId)) 可以注意后面
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
@Override
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
//根据key 去获取nodeSource 节点,为什么?
//因为如果我们是 3个M 3S,我们要知道key放在哪个Node上面 对吧
NodeSource source = getNodeSource(key);
//这里keys 是上面的数组,里面就一个元素 就是你指定的锁名称 KEY[1] 所以就代表指定的锁
//params 里放的是什么数据?internalLockLeaseTime, getLockName(threadId)),就是对应
//AGVS[1] AGVS[2] 一个是LockLeaseTime watchDog 的时长 getLockName(threadId)) = UUID + ":" + ThreadId 其实就是客户端上的一个线程的唯一标识
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
} private NodeSource getNodeSource(byte[] key) {
//redis cluster 默认slot 数量,是16384,无论你创建多大的一个redis cluster,它会统一
//有一个slot 的划分,将集群的存储空间划分我饿16384,这16384个slot就平均分布在各个master 实例上面,同时每个实例持有一个slot -》 实例的存储集,因此可能会访问两次redis 集群,第一次 获取key 所在的slot 对应的实例,第二次进行数据操作
//计算出slot
int slot = connectionManager.calcSlot(key);
//获取到对应slot 对应的实例信息
MasterSlaveEntry entry = connectionManager.getEntry(slot);
//封装为一个nodeSource
return new NodeSource(entry);
} 计算slot 其实就是基于hash 进行一下计算
@Override
public int calcSlot(byte[] key) {
if (key == null) {
return 0;
}
int start = indexOf(key, (byte)'{');
if (start != -1) {
int end = indexOf(key, (byte)'}');
key = Arrays.copyOfRange(key, start+1, end);
}
int result = CRC16.crc16(key) % MAX_SLOT;
return result;
} 执行脚本
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RPromise<R> mainPromise = createPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false, null);
return mainPromise;
} 加锁成功后的数据结构是什么?
首先我们要知道是 lockkey 对应一个Map 数据结构,然后唯一标识 设置为1,代表加锁成功,大致json结构如下,为什么要这样的结构?这是为了锁的可重入以及其他的设计,
lockKey{
’UUID:ThreadId‘ : 1
}
其实到这里我们可以知道,其实你并不能一直持有锁,默认30S后会释放掉锁,或者自动进行延期,但是30S 得是什么系统?
到这我们再思考一下:如果watchdog 要进行续约,或者递归调用了可重入锁,是不是还是走这段lua脚本?那么对应得操作会如何处理?
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);" 每次进来就会 +1,同时对锁进行延时
lockKey{
’UUID:ThreadId‘ : 2
}
其实这个对应得数字,可以代表什么意思?可以代表watchdog 重入次数,续约的操作另有隐情,那么现在就是如何进行延期,那么肯定是watchdog 来进行了处理。
其实到这我们已经大概知道了加锁逻辑,本质就是lua脚本 + redis + Map数据结构,但是我们应该思考接下来的问题
(1)如果某台机器上的某个线程已经进行了加锁,那么这台机器的其他线程尝试去加锁,肯定会失败,肯定会阻塞,那么又如何阻塞?阻塞后又如何醒来,尝试获取锁?
(2)如果某台机器上的某个线程已经进行了加锁,其他机器去尝试加锁,肯定也会失败并阻塞,如果阻塞?阻塞后又如何醒来,尝试获取锁?又是如何判断获取锁超时,然后放弃?
其实只要这几个关键的地方分析好了,其实redis 的分布式锁的设计思想我们大概也清楚了了








还没有评论,来说两句吧...