Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目。
1.应用示例
场景:资源访问,服务限流
public class SemaphoreDemo {
public static void main(String[] args) {
// 限流,同一时刻最多能有两个线程执行
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i ++) {
new Thread(new Task(semaphore, "zhangsan +" + i)).start();
}
}
static class Task extends Thread {
Semaphore semaphore;
public Task(Semaphore semaphore, String threadname) {
this.semaphore = semaphore;
this.setName(threadname);
}
public void run() {
try {
// 申请资源(同时最多两个thread)
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + ": acquire() at time:" + System.currentTimeMillis());
Thread.sleep(1000);
// 释放资源
semaphore.release();
System.out.println(Thread.currentThread().getName() + ": release() at time:" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
结果如下:
Thread-3: acquire() at time:1618487257823
Thread-1: acquire() at time:1618487257823
Thread-3: release() at time:1618487258824
Thread-5: acquire() at time:1618487258824
Thread-1: release() at time:1618487258824
Thread-9: acquire() at time:1618487258824
Thread-7: acquire() at time:1618487259825
Thread-9: release() at time:1618487259825
Thread-5: release() at time:1618487259825
Thread-7: release() at time:1618487260825
可以看到两点
- 同一时刻最多只能有两个线程同时执行
- 只有一个线程 release() 后,才能有一个新线程 acquire()
2.源码分析
Semaphore 的源码实现其实跟 CountDownLatch 很相似,都是通过直接复用 AQS 来实现的(内部类 Sync);
PS:关于 CountDownLatch 的使用示例及源码分析,请看这篇文章…
我们先来看看 Semaphore 的构造函数做了什么
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
Semaphore 的构造函数实际上是构造了一个 NonfairSync(非公平锁),并传入了我们预设的最大允许执行线程数
可以看到,NonfairSync 实际上是调用了 AQS 的构造函数,是将 AQS 的 state 从默认的 1 改成了我们预设的 permits。这也正是 Semaphore 能同时多个线程执行的关键所在。
2.1 acquire()
当使用 Semaphore 后,要申请资源时调用 acquire() 方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
再来看 acquireSharedInterruptibly() 方法,该方法是在 AQS 中的,Semaphore 的内部类 Sync 并没有自己实现/重写:

这里有两个核心方法,tryAcquireShared() 和 doAcquireInterruptibly(),它俩的关系是,当 tryAcquireShared() 判断无法拿到锁时,就会进入 AQS 的 doAcquireInterruptibly() 方法。
tryAcquireShared()
这个方法最终会走到内部类 Sync 的 nonfairTryAcquireShared() 方法
// Semaphore.Sync
final int nonfairTryAcquireShared(int acquires) {
// 自旋
for (;;) {
int available = getState();
// 计算修改后的 AQS 的 state(减去当前线程要申请的资源数)
int remaining = available - acquires;
// 当修改后的 state 大于 0,且 CAS 能够修改成功时,可以拿锁
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
doAcquireSharedInterruptibly()
// AQS
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将当前线程封装为node(共享模式),并加到同步队列队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 自旋,保证所有被唤醒的线程都能依次恢复运行
for (;;) {
final Node p = node.predecessor();
// 当前node前进到队二 && tryAcquire成功(state减到0),就可以执行了
if (p == head) {
// 判断是否能拿到锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// setHeadAndPropagate 会调用 doReleaseShared 去唤醒后续 Shared 节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 其余线程阻塞(最后也是在此处醒来)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
注:虽然是共享锁模式,一个线程拿锁之后会唤醒同步队列中后续的 Shared 节点,但醒了后还要判断是否能拿到锁(tryAcquireShared),所以还会再次陷入休眠。(其实我觉得这里或许可以用互斥锁模式,欢迎讨论。。。)
2.2 release()
当使用 Semaphore,某个线程要释放资源时,调用 release()
public void release() {
sync.releaseShared(1);
}
releaseShared()
// AQS
public final boolean releaseShared(int arg) {
// 判断能否释放锁(修改state)
if (tryReleaseShared(arg)) {
// 唤醒后续线程
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared()
// Semaphore.Sync
protected final boolean tryReleaseShared(int releases) {
// 自旋,保证 state 修改一定能成功
for (;;) {
int current = getState();
// 将 state 加上当前线程锁持有的资源数
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS 修改 state
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared()
// AQS
private void doReleaseShared() {
// 自旋,保证所有线程正常的线程都能被唤醒
for (;;) {
Node h = head;
// 还没有到队尾,此时队列中至少有两个节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头结点状态是 SIGNAL ,说明后续节点都需要唤醒
if (ws == Node.SIGNAL) {
// CAS 保证只有一个节点可以运行唤醒的操作
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 进行唤醒操作
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 退出自旋条件 h==head,一般出现于以下两种情况
// 第一种情况,头节点没有发生移动,结束。
// 第二种情况,因为此方法可以被两处调用,一次是获得锁的地方,一处是释放锁的地方,
// 加上共享锁的特性就是可以多个线程获得锁,也可以释放锁,这就导致头节点可能会发生变化,
// 如果头节点发生了变化,就继续循环,一直循环到头节点不变化时,结束循环。
if (h == head) // loop if head changed
break;
}
}






还没有评论,来说两句吧...