Java并发编程之栅栏CyclicBarrier及源码详解

什么是栅栏

Java中栅栏CyclicBarrier是一种同步机制,栅栏能够让一组线程到达一个同步点时被阻塞,直到这组线程中的最后一个线程到达同步点,所有被阻塞的线程才会被唤醒继续执行,即目的是只有某一组线程全部执行到同步点时,才继续执行,否则先到达同步点的线程将阻塞。
栅栏CyclicBarrier与闭锁CountDownLatch的功能类似,分析过源码后会简要列举二者区别。

栅栏实现思路

CyclicBarrier类的构造函数会要求使用者为parties变量赋初值,parties变量表示一组线程的总数量,CyclicBarrier内用计数器变量count表示未到达同步点线程的数量。当某一线程到达同步点时,则调用await()方法,此过程中会使count自减,此时如果计数器值非零,说明仍有线程未到达同步点,则当前线程进入阻塞状态,如果计数器值为零,说明全部线程到达了同步点,则唤醒全部线程继续执行。

栅栏源码分析

分析核心方法之前,先关注一下CyclicBarrier类中的成员变量以及构造方法,源码如下:

	
	//重入锁主要用于保证线程安全
    private final ReentrantLock lock = new ReentrantLock();
    //Condition用于阻塞和唤醒线程
    private final Condition trip = lock.newCondition();
	//表示一组线程的总数量
    private final int parties;
	//本变量表示当所有线程到达同步点且被唤醒前 将要被执行的操作 (可以为null 表示无操作)
    private final Runnable barrierCommand;
	
    private Generation generation = new Generation();
	//表示还未执行到同步点的线程数量 当该变量为0时表示所有线程均已执行到同步点
	private int count;
	
	// 主要进行初始化操作 barrierAction参数说明同变量barrierCommand
	public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

接下来关注CyclicBarrier类中的静态内部类Generation及成员变量generation,为了讲解Generation方便,我们必须先了解到,与闭锁和信号量不同,CyclicBarrier是可重用的,具体实现会在后文提到,先来说明Generation:

	/**
	 * 说到Generation代,很容易联想到垃圾收集的代,但笔者认为二者含义可以说是毫不相干
	 * 笔者认为此处的Generation更像是CyclicBarrier的唯一标识
	 * 其中布尔变量broken用于表示当前代是否已经被破坏
	 * 当前线程若判断当前代未被破坏 则正常执行
	 * 若判断已经被破坏 则不应继续执行 同时应唤醒所有在等待队列中的线程 
	 * 下列几种情况,broken会被设置为true,表示当前代已被破坏
	 * 1.待加入等待队列或等待队列中的某线程被中断
	 * 2.barrierCommand执行时抛出异常
	 * 3.阻塞线程等待时间超出设定阈值
	 * 后文的源码分析会呈现以上情况
	 * 
	 */
	private static class Generation {
        boolean broken = false;
    }

	private Generation generation = new Generation();

下面着重关注CyclicBarrier的核心代码部分,源码如下:

	/**
	* 哪个线程调用了await()方法,就表示该线程已经执行到了同步点
	* CyclicBarrier提供了两个版本的await()方法
	* 无论是否设定时间阈值 最终都要调用dowait()方法 核心逻辑都在此方法中
	*/
	public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    
	//CyclicBarrier的核心方法
	private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        /**
        * 首先线程尝试获取独占锁
        * 以此保证不会有多个线程同时修改count和generation等变量 保证了线程安全
        */
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        	//获取当前代
            final Generation g = generation;
			//如果当前代已被破坏 则抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
			/**
			* 对应Generation注释中的情况1 待加入等待队列的线程被中断 则判定当前代被破坏
			* 此处breakBarrier()会将当前代的broken设为true 同时唤醒其他线程
			* 这样如果其他被唤醒的线程再次执行await()方法
			* 便会止步于↑↑↑前一个判断并抛出异常
			*/
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
			/**
			* index用于表示剩余未到达同步点的线程数量
			* 至于为什么此处不直接使用count变量 笔者猜想是基于线程安全考虑
			* 纵观dowait方法 
			* 虽然同时只有一个线程可以持有锁 且dowait方法内除自减再无对count的写操作
			* 但仍存在用于重置栅栏的非私有方法reset()可以并发的修改count的值 进而引发线程安全问题
			*/
            int index = --count;
            //index为0 表示全部线程执行到同步点
            if (index == 0) {
            	//用于判断下列try块中代码(具体是barrierCommand)是否顺利执行
                boolean ranAction = false;
                try {
                	//在唤醒其他线程前 若有指定 则优先执行barrierCommand
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    //执行成功
                    ranAction = true;
                    //换代操作 
                    //该方法会唤醒等待队列中的线程 创建一个新的Generation实例 并重置count
                    nextGeneration();
                    return 0;
                } finally {
                	/*
                	* 如果ranAction为false 表示barrierCommand执行过程中出现异常
                	* 对应了Generation注释中的情况2
                	*/
                	
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 若能执行到这里 代表仍有线程未到达同步点
            for (;;) {
                try {
                	/**
                	* 根据是否设定了时间阈值 分别调用不同版本的await方法
                	* await方法要做的是将当前线程加入到等待队列中(阻塞)
					*/
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                	/**
                	* 捕获到了中断异常 仍对应Generation注释中的情况1 需要将broken表示设置为true
                	*/
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                    	//如果不满足if条件 说明线程不属于当前一代 中断                    	
                        Thread.currentThread().interrupt();
                    }
                }
				//检测到当前代已被破坏 抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
				//dowait执行过程中 或执行了nextGeneration或外界调用reset 已经换代 方法返回
                if (g != generation)
                    return index;
				/**
				* 等待超时 对应Generation注释中的情况三 将broken设为true
				*/
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
        	//无论执行结果如何 释放重入锁 以便同步队列中的其他线程尝试获取锁
            lock.unlock();
        }
    }
	

最后简要分析其余方法,源码如下:

	/**
	* 换代操作 若其他方法中也出现了本方法中注释过的方法 后文不再注释
	* 此处未尝试获取锁 是因为该方法是私有方法 且只会在尝试获取锁的方法dowait中被调用 线程安全
	*/
	private void nextGeneration() {
        // 唤醒等待队列中全部线程
        trip.signalAll();
        // 重置count计数器
        count = parties;
        //创建一个新的Generation实例 表示换代
        generation = new Generation();
    }
	//未尝试获取锁与nextGeneration()同理
	private void breakBarrier() {
		//标记当前代已被破坏
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
	//获取总线程数 原子操作 不需要获取锁
	public int getParties() {
        return parties;
    }
	//独占地判断当前代是否已经被破坏 保证其他线程不会同时对broken执行写操作 线程安全
	public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
    //独占地重置CyclicBarrier
	public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
	
	//独占地获取正在等待的线程数量 保证其他线程不会同时对count执行写操作 线程安全
	 public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

栅栏使用的简单实例

假设有如下场景需求:假设有N组数据需要分别计算后,让各组计算结果对某一线程可见,再由该线程经过某些计算后得出最终结果,且该过程需要多次执行。可以考虑使用栅栏解决该问题,示例代码及结果如下

	public class CyclicBarrierTest {

    public static final int N = 3;

    private static class Compute implements Runnable{

        private final CyclicBarrier barrier;
        private final Integer id;

        private Compute(CyclicBarrier barrier, Integer id) {
            this.barrier = barrier;
            this.id = id;
        }

        @Override
        public void run() {
            System.out.println("开始执行第" + id + "组数据计算");
            try {
                //某些计算过程
                Thread.sleep(500);
                System.out.println("第" + id + "组数据计算完毕.主线程可见结果");
                barrier.await();
                System.out.println("全部" + id + "阻塞完毕");
            } catch (Exception e) {}
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(N+1);
        System.out.println("第一轮次计算");
        for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Compute(barrier,i));
            thread.start();
        }
        try {
            System.out.println("主线程阻塞等待其他线程计算结果");
            barrier.await();
            Thread.sleep(50);
            System.out.println("主线程得到三组计算结果 计算并得到最终结果");
        } catch (Exception e) {}
        barrier.reset();
        //仅创建一个线程 证明可重用即可
        Thread.sleep(50);
        System.out.println("第二轮次计算");
        Thread thread = new Thread(new Compute(barrier,1));
        thread.start();

    }

}
第一轮次计算
主线程阻塞等待其他线程计算结果
开始执行第1组数据计算
开始执行第0组数据计算
开始执行第2组数据计算
第0组数据计算完毕.主线程可见结果
第1组数据计算完毕.主线程可见结果
第2组数据计算完毕.主线程可见结果
全部2阻塞完毕
全部0阻塞完毕
全部1阻塞完毕
主线程得到三组计算结果 计算并得到最终结果
第二轮次计算
开始执行第1组数据计算
第1组数据计算完毕.主线程可见结果

栅栏的部分特点与闭锁CountDownLatch的区别

  • 栅栏用于等待一组线程全部执行到同步点,而闭锁用于某个或多个线程等待某外部事件执行完毕。
  • 栅栏可重用,闭锁是一次性的。
  • 栅栏用到了同步队列(获取释放锁)和等待队列(阻塞和唤醒线程),闭锁仅使用了同步队列。

以上便是本篇文章的全部内容
作者才疏学浅,如文中出现纰漏,还望指正

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

7rulyL1ar

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值