栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
1.应用示例
可以用于多线程计算数据,最后合并计算结果的场景。
例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水。
先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
public class CyclicBarrierDemo implements Runnable {
private CyclicBarrier cyclicBarrier;
private int index;
public CyclicBarrierDemo(CyclicBarrier cyclicBarrier, int index) {
this.cyclicBarrier = cyclicBarrier;
this.index = index;
}
@Override
public void run() {
try {
System.out.println("特工" + index + "到达屏障");
index--;
// 线程计数+1
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
// 初始化 CyclicBarrier,并设置当所有线程到达屏障后要执行的任务(可选)
CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
@Override
public void run() {
System.out.println("所有人员到达屏障,准备开始执行秘密任务");
}
});
for(int i = 1; i <= 10; i++) {
new Thread(new CyclicBarrierDemo(cyclicBarrier, i)).start();
}
Thread.sleep(500);
System.out.println("将军也到屏障了");
cyclicBarrier.await();
}
}
执行解决如下:
特工1到达屏障
特工3到达屏障
特工2到达屏障
特工4到达屏障
特工5到达屏障
特工7到达屏障
特工6到达屏障
特工8到达屏障
特工9到达屏障
特工10到达屏障
将军也到屏障了
所有人员到达屏障,准备开始执行秘密任务
可以看到,当这11线程都 await(),即都到达屏障后,才会执行预设的任务。
2.源码分析
我们先来看看 CyclicBarrier 的核心成员变量及构造函数
public class CyclicBarrier {
// 每次使用屏障都会生成一个 Generation 实例
private static class Generation {
boolean broken = false;
}
// 可重入锁
private final ReentrantLock lock = new ReentrantLock();
// 条件队列,除最后一个到达屏障的线程,其余线程都会阻塞在这个条件队列中
private final Condition trip = lock.newCondition();
// 所有应该达屏障的线程总数(也就是size)
private final int parties;
// 还没有达到屏障的线程数
int count
// 当所有线程到达时要执行的任务(构造时可选,即可为null)
private final Runnable barrierCommand;
// Generation 实例,在此处进行初始化
private Generation generation = new Generation();
// 指定 barrierCommand 的构造 CyclicBarrier
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
//...
}
注:跟前面说过的 CountDownLatch 和 Semaphore 直接通过内部类复用 AQS 不同,CyclicBarrier 复用更上层的 Reentrantlock。
await()
下面我们就从 await() 方法入手,来看看 Java 是如何实现 CyclicBarrier 的。
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 核心方法
// 入参是超时(默认false,关闭)
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait()
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取到已经初始化好 Reentrantlock(所有线程都用这一把锁)
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取到 Generation 实例
final Generation g = generation;
// 如果 Generation 已经被破坏了,抛异常
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程是否中断了,如果中断了,就 breakBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// count-1 = 新的未到达屏障的线程
int index = --count;
// 如果达到屏障的线程数达到了预设值
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 获取构造器传入的 command
final Runnable command = barrierCommand;
// 如果 command != null,执行它
if (command != null)
command.run();
ranAction = true;
// 生成一个新的 Generation 实例
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 当所有线程还没有达到预设值,进入自旋
for (;;) {
try {
// 如果没有设置超时(默认false)
if (!timed)
// 进入 trip 的条件队列阻塞
trip.await();
// 如果设置了超时
else if (nanos > 0L)
// 进入条件队列计时阻塞
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 异常,breakBarrier
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// barrier broken 直接抛异常
if (g.broken)
throw new BrokenBarrierException();
// 所有线程到达 barrier 直接返回
if (g != generation)
return index;
// 等待超时直接抛异常, 重置 generation
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
当所有线程都到达了屏障,就会走到 nextGeneration() 方法
注:条件队列中的线程被唤醒后,回先去抢锁,抢到了别的什么都不干,就是直接释放锁。
nextGeneration()
private void nextGeneration() {
// signal completion of last generation
// 唤醒之前阻塞在 trip 条件队列的线程
// 注:这里其实是由最后一个来的线程执行的唤醒操作
trip.signalAll();
// set up next generation
count = parties;
// 建立下一代
generation = new Generation();
}
其余方法
/**
* 判断 barrier 是否 broken = true
*/
public boolean isBroken(){
final ReentrantLock lock = this.lock;
lock.lock();
try{
return generation.broken;
}finally {
lock.unlock();
}
}
// 重置 barrier
public void reset(){
final ReentrantLock lock = this.lock;
lock.lock();
try{
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
}finally {
lock.unlock();
}
}
/**
* 获取等待中的线程
*/
public int getNumberWaiting(){
final ReentrantLock lock = this.lock;
lock.lock();
try{
return parties - count;
}finally {
lock.unlock();
}
}
3.对比 CountDownLatch
关于 CountDownLatch 的使用示例及源码分析,请看这篇文章…
相同点
CountDownLatch 和 CyclicBarrier 都可以用于线程间协同
都是在构造器中传入预设值后,通过手动/自动的方式使之减减,在不满足等于0的条件下,线程会进入阻塞
不同点
1)在使用上
- CountDownLatch:提供的 api 是 await() -> countDown(),即需要手动减减;并且,在一个线程中可以多次调用 countDown()
- CyclicBarrier:提供的 api 只有 await(),它对于预设值的减减是自动的(统计调用 await() 的线程数);另外,还可以在构造器中传入一个所有线程到达屏障后要执行的最终任务
2)实现原理
- CountDownLatch 是直接复用 AQS,使用的是共享锁模式,操作的是 AQS 的 state
- CyclicBarrier 是复用更上层的 Reentrantlock,所以使用的是互斥锁模式,操作的是一个自定义计数器
注:虽然 CyclicBarrier 是互斥锁,但阻塞醒来并拿到锁后,就直接把锁释放了,与 CountDownLatch 使用 AQS 共享模式性能几乎无差。而且在判断到底使用哪一个时,更多的是在功能上考虑。






还没有评论,来说两句吧...