redisson 内部实现的还有一种semaphore 信号量的模式,这个是个什么模式?
大白话说就是资源池中存在一部分共享的资源,多个线程可以从资源池里面去获取资源,如果资源被获取完,那么其他向获取资源的线程就需要等待,别人释放资源。
其实逻辑很清晰
(1)设置资源池的数量 也就是设置对应的semaphore key的 value值
(2)尝试获取资源,如果足够 对资源池key 对应的value - 数量,如果不足够,进入死循环,不停的尝试获取
(3)释放资源,其实就是将对应的key的value 重新加入回来
(4)然后发布一条消息,让等待的client 去尝试获取资源
public static void main(String[] args) throws Exception {
//构建一个配置信息对象
Config config = new Config();
config.useClusterServers()
//定时扫描连接信息 默认1000ms
.setScanInterval(2000)
.addNodeAddress("redis://127.0.0.1:7001");
//因为Redisson 是基于redis封装的一套便于复杂操作的框架
//所以这里构建对象肯定是创建一些与redis的连接
RedissonClient redisson = Redisson.create(config);
RSemaphore semaphore = redisson.getSemaphore("semaphore");
//设置资源数量 只需要设置一次
semaphore.trySetPermits(3);
//请求资源 默认获取一个,可以自己一次获取多个
semaphore.acquire();
//释放资源
semaphore.release();
} @Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//设置资源数量lua 脚本
"local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getName(), getChannelName()), permits);
} @Override
public void acquire(int permits) throws InterruptedException {
//尝试获取资源
if (tryAcquire(permits)) {
return;
}
//获取失败就进行循环尝试获取,订阅channel
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire(permits)) {
return;
}
getEntry().getLatch().acquire(permits);
}
} finally {
unsubscribe(future);
}
// get(acquireAsync(permits));
} @Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(true);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//尝试获取资源数量
"local value = redis.call('get', KEYS[1]); " +
//如果存在 且数量大于我们需要的,那么就将value - permits 返回1 代表成功 否则返回0代表失败
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), permits);
} 释放信号量
@Override
public RFuture<Void> releaseAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(null);
}
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
//将对应的permits 数量的信号量 加入到key value中
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
//发布订阅消息
"redis.call('publish', KEYS[2], value); ",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
本文标题:Redisson 源码初探(十) Semaphore 模式
本文链接:https://blog.quwenai.cn/post/2891.html
版权声明:本文不使用任何协议授权,您可以任何形式自由转载或使用。






还没有评论,来说两句吧...