什么是栅栏
Java中栅栏CyclicBarrier是一种同步机制,栅栏能够让一组线程到达一个同步点时被阻塞,直到这组线程中的最后一个线程到达同步点,所有被阻塞的线程才会被唤醒继续执行,即目的是只有某一组线程全部执行到同步点时,才继续执行,否则先到达同步点的线程将阻塞。
栅栏CyclicBarrier与闭锁CountDownLatch的功能类似,分析过源码后会简要列举二者区别。
栅栏实现思路
CyclicBarrier类的构造函数会要求使用者为parties变量赋初值,parties变量表示一组线程的总数量,CyclicBarrier内用计数器变量count表示未到达同步点线程的数量。当某一线程到达同步点时,则调用await()方法,此过程中会使count自减,此时如果计数器值非零,说明仍有线程未到达同步点,则当前线程进入阻塞状态,如果计数器值为零,说明全部线程到达了同步点,则唤醒全部线程继续执行。
栅栏源码分析
分析核心方法之前,先关注一下CyclicBarrier类中的成员变量以及构造方法,源码如下:
//重入锁主要用于保证线程安全
private final ReentrantLock lock = new ReentrantLock();
//Condition用于阻塞和唤醒线程
private final Condition trip = lock.newCondition();
//表示一组线程的总数量
private final int parties;
//本变量表示当所有线程到达同步点且被唤醒前 将要被执行的操作 (可以为null 表示无操作)
private final Runnable barrierCommand;
private Generation generation = new Generation();
//表示还未执行到同步点的线程数量 当该变量为0时表示所有线程均已执行到同步点
private int count;
// 主要进行初始化操作 barrierAction参数说明同变量barrierCommand
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
接下来关注CyclicBarrier类中的静态内部类Generation及成员变量generation,为了讲解Generation方便,我们必须先了解到,与闭锁和信号量不同,CyclicBarrier是可重用的,具体实现会在后文提到,先来说明Generation:
/**
* 说到Generation代,很容易联想到垃圾收集的代,但笔者认为二者含义可以说是毫不相干
* 笔者认为此处的Generation更像是CyclicBarrier的唯一标识
* 其中布尔变量broken用于表示当前代是否已经被破坏
* 当前线程若判断当前代未被破坏 则正常执行
* 若判断已经被破坏 则不应继续执行 同时应唤醒所有在等待队列中的线程
* 下列几种情况,broken会被设置为true,表示当前代已被破坏
* 1.待加入等待队列或等待队列中的某线程被中断
* 2.barrierCommand执行时抛出异常
* 3.阻塞线程等待时间超出设定阈值
* 后文的源码分析会呈现以上情况
*
*/
private static class Generation {
boolean broken = false;
}
private Generation generation = new Generation();
下面着重关注CyclicBarrier的核心代码部分,源码如下:
/**
* 哪个线程调用了await()方法,就表示该线程已经执行到了同步点
* CyclicBarrier提供了两个版本的await()方法
* 无论是否设定时间阈值 最终都要调用dowait()方法 核心逻辑都在此方法中
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
//CyclicBarrier的核心方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
/**
* 首先线程尝试获取独占锁
* 以此保证不会有多个线程同时修改count和generation等变量 保证了线程安全
*/
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取当前代
final Generation g = generation;
//如果当前代已被破坏 则抛出异常
if (g.broken)
throw new BrokenBarrierException();
/**
* 对应Generation注释中的情况1 待加入等待队列的线程被中断 则判定当前代被破坏
* 此处breakBarrier()会将当前代的broken设为true 同时唤醒其他线程
* 这样如果其他被唤醒的线程再次执行await()方法
* 便会止步于↑↑↑前一个判断并抛出异常
*/
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
/**
* index用于表示剩余未到达同步点的线程数量
* 至于为什么此处不直接使用count变量 笔者猜想是基于线程安全考虑
* 纵观dowait方法
* 虽然同时只有一个线程可以持有锁 且dowait方法内除自减再无对count的写操作
* 但仍存在用于重置栅栏的非私有方法reset()可以并发的修改count的值 进而引发线程安全问题
*/
int index = --count;
//index为0 表示全部线程执行到同步点
if (index == 0) {
//用于判断下列try块中代码(具体是barrierCommand)是否顺利执行
boolean ranAction = false;
try {
//在唤醒其他线程前 若有指定 则优先执行barrierCommand
final Runnable command = barrierCommand;
if (command != null)
command.run();
//执行成功
ranAction = true;
//换代操作
//该方法会唤醒等待队列中的线程 创建一个新的Generation实例 并重置count
nextGeneration();
return 0;
} finally {
/*
* 如果ranAction为false 表示barrierCommand执行过程中出现异常
* 对应了Generation注释中的情况2
*/
if (!ranAction)
breakBarrier();
}
}
// 若能执行到这里 代表仍有线程未到达同步点
for (;;) {
try {
/**
* 根据是否设定了时间阈值 分别调用不同版本的await方法
* await方法要做的是将当前线程加入到等待队列中(阻塞)
*/
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
/**
* 捕获到了中断异常 仍对应Generation注释中的情况1 需要将broken表示设置为true
*/
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//如果不满足if条件 说明线程不属于当前一代 中断
Thread.currentThread().interrupt();
}
}
//检测到当前代已被破坏 抛出异常
if (g.broken)
throw new BrokenBarrierException();
//dowait执行过程中 或执行了nextGeneration或外界调用reset 已经换代 方法返回
if (g != generation)
return index;
/**
* 等待超时 对应Generation注释中的情况三 将broken设为true
*/
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//无论执行结果如何 释放重入锁 以便同步队列中的其他线程尝试获取锁
lock.unlock();
}
}
最后简要分析其余方法,源码如下:
/**
* 换代操作 若其他方法中也出现了本方法中注释过的方法 后文不再注释
* 此处未尝试获取锁 是因为该方法是私有方法 且只会在尝试获取锁的方法dowait中被调用 线程安全
*/
private void nextGeneration() {
// 唤醒等待队列中全部线程
trip.signalAll();
// 重置count计数器
count = parties;
//创建一个新的Generation实例 表示换代
generation = new Generation();
}
//未尝试获取锁与nextGeneration()同理
private void breakBarrier() {
//标记当前代已被破坏
generation.broken = true;
count = parties;
trip.signalAll();
}
//获取总线程数 原子操作 不需要获取锁
public int getParties() {
return parties;
}
//独占地判断当前代是否已经被破坏 保证其他线程不会同时对broken执行写操作 线程安全
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
//独占地重置CyclicBarrier
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
//独占地获取正在等待的线程数量 保证其他线程不会同时对count执行写操作 线程安全
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
栅栏使用的简单实例
假设有如下场景需求:假设有N组数据需要分别计算后,让各组计算结果对某一线程可见,再由该线程经过某些计算后得出最终结果,且该过程需要多次执行。可以考虑使用栅栏解决该问题,示例代码及结果如下
public class CyclicBarrierTest {
public static final int N = 3;
private static class Compute implements Runnable{
private final CyclicBarrier barrier;
private final Integer id;
private Compute(CyclicBarrier barrier, Integer id) {
this.barrier = barrier;
this.id = id;
}
@Override
public void run() {
System.out.println("开始执行第" + id + "组数据计算");
try {
//某些计算过程
Thread.sleep(500);
System.out.println("第" + id + "组数据计算完毕.主线程可见结果");
barrier.await();
System.out.println("全部" + id + "阻塞完毕");
} catch (Exception e) {}
}
}
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(N+1);
System.out.println("第一轮次计算");
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Compute(barrier,i));
thread.start();
}
try {
System.out.println("主线程阻塞等待其他线程计算结果");
barrier.await();
Thread.sleep(50);
System.out.println("主线程得到三组计算结果 计算并得到最终结果");
} catch (Exception e) {}
barrier.reset();
//仅创建一个线程 证明可重用即可
Thread.sleep(50);
System.out.println("第二轮次计算");
Thread thread = new Thread(new Compute(barrier,1));
thread.start();
}
}
第一轮次计算
主线程阻塞等待其他线程计算结果
开始执行第1组数据计算
开始执行第0组数据计算
开始执行第2组数据计算
第0组数据计算完毕.主线程可见结果
第1组数据计算完毕.主线程可见结果
第2组数据计算完毕.主线程可见结果
全部2阻塞完毕
全部0阻塞完毕
全部1阻塞完毕
主线程得到三组计算结果 计算并得到最终结果
第二轮次计算
开始执行第1组数据计算
第1组数据计算完毕.主线程可见结果
栅栏的部分特点与闭锁CountDownLatch的区别
- 栅栏用于等待一组线程全部执行到同步点,而闭锁用于某个或多个线程等待某外部事件执行完毕。
- 栅栏可重用,闭锁是一次性的。
- 栅栏用到了同步队列(获取释放锁)和等待队列(阻塞和唤醒线程),闭锁仅使用了同步队列。
以上便是本篇文章的全部内容
作者才疏学浅,如文中出现纰漏,还望指正