系统掌握并发编程系列(十)并发协同利器Phaser之概念讲解

系统掌握并发编程系列(一)精准理解线程的创建和停止

系统掌握并发编程系列(二)详解Thread类的主要属性和方法

系统掌握并发编程系列(三)一步步剖析线程返回值

系统掌握并发编程系列(四)详细分析传统并发协同方式(synchronized与wait() notify())

系统掌握并发编程系列(五)讲透传统并发协同方式伪唤醒与加锁失效问题

系统掌握并发编程系列(六)详细讲解并发协同利器CountDownLatch

系统掌握并发编程系列(七)详细讲解并发协同利器CyclicBarrier

系统掌握并发编程系列(八)详细讲解并发协同利器Semaphore

系统掌握并发编程系列(九)并发协同利器信号量的实战应用分析

回顾下之前已经学习的CountDownLatch、CyclicBarrier、Semaphore等工具的使用场景,CountDownLatch主要用于等待多个线程完成后继续执行或者多个线程等待同时执行,不能重复使用;CyclicBarrier主要用于多个线程等待同时执行或者多个线程多次等待同时执行,可以重复使用;Semaphore主要用于控制并发访问资源的线程数量。

在多个线程同时调用接口这个场景中,如果我们要在不同的时间段分阶段压测接口,用于模拟正式环境上不同时间段的系统访问量,以便观察系统在不同时间段的资源使用情况。在这个场景中,有两个重要的特征:多阶段重复调用,每个阶段能灵活调整参与调用接口的线程数量。似乎CountDownLatch、CyclicBarrier、Semaphore这个三个工具均不能满足,此时另一个并发协同工具Phaser出现了。

多阶段并发协同工具Phaser

Phase的字面的意思是“阶段”,‘’时期‘的意思,Phaser的字面意思是“相位器”,“移相器”的意思,从字面上理解就是多阶段的相位器。Phaser是JDK1.7以后新增的灵活的多阶段并发协同相位器,可以替代CountDownLatch和CyclicBarrier,适用于更复杂的并发协同需求‌。它允许多个线程在多个阶段上进行协同,而不是像CountDownLatch那样只能在一个点上进行协同。下面我们来学习下Phaser相关的一些基础概念。

注册与注销
/**
 * A reusable synchronization barrier, similar in functionality to
 * {@link CyclicBarrier} and {@link CountDownLatch} but supporting
 * more flexible usage.
 *
 * <p><b>Registration.</b> Unlike the case for other barriers, the
 * number of parties <em>registered</em> to synchronize on a phaser
 * may vary over time.  Tasks may be registered at any time (using
 * methods {@link #register}, {@link #bulkRegister}, or forms of
 * constructors establishing initial numbers of parties), and
 * optionally deregistered upon any arrival (using {@link
 * #arriveAndDeregister}).  As is the case with most basic
 * synchronization constructs, registration and deregistration affect
 * only internal counts; they do not establish any further internal
 * bookkeeping, so tasks cannot query whether they are registered.
 * (However, you can introduce such bookkeeping by subclassing this
 * class.)
 *
 */

注册是将一个线程加入Phaser相位器的过程,注销则是将一个已完成任务的线程从Phaser相位器中移除的过程。Phaser是一个可重复使用的同步屏障,功能类似于CyclicBarrier和CountDownLatch,但支持更灵活的使用。不同于其他屏障,注册在Phaser上同步的参与线程数量可以随时变化的。可以在任何时间调用register()方法、bulkRegister()进行注册,或者以构造函数的方式初始化参与线程数量;并且可以在任何线程到达屏障点的时候调用arriveAndDeregister()方法选择性的注销参与者。注册与注销仅仅是影响内部数量,内部并没有建立任何的记账。

阶段号协同
/** 
 * <p><b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
 * Phaser} may be repeatedly awaited.  Method {@link
 * #arriveAndAwaitAdvance} has effect analogous to {@link
 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
 * generation of a phaser has an associated phase number. The phase
 * number starts at zero, and advances when all parties arrive at the
 * phaser, wrapping around to zero after reaching {@code
 * Integer.MAX_VALUE}. The use of phase numbers enables independent
 * control of actions upon arrival at a phaser and upon awaiting
 * others, via two kinds of methods that may be invoked by any
 * registered party:
 *
 * <ul>
 *
 *   <li><b>Arrival.</b> Methods {@link #arrive} and
 *       {@link #arriveAndDeregister} record arrival.  These methods
 *       do not block, but return an associated <em>arrival phase
 *       number</em>; that is, the phase number of the phaser to which
 *       the arrival applied. When the final party for a given phase
 *       arrives, an optional action is performed and the phase
 *       advances.  These actions are performed by the party
 *       triggering a phase advance, and are arranged by overriding
 *       method {@link #onAdvance(int, int)}, which also controls
 *       termination. Overriding this method is similar to, but more
 *       flexible than, providing a barrier action to a {@code
 *       CyclicBarrier}.
 *
 *   <li><b>Waiting.</b> Method {@link #awaitAdvance} requires an
 *       argument indicating an arrival phase number, and returns when
 *       the phaser advances to (or is already at) a different phase.
 *       Unlike similar constructions using {@code CyclicBarrier},
 *       method {@code awaitAdvance} continues to wait even if the
 *       waiting thread is interrupted. Interruptible and timeout
 *       versions are also available, but exceptions encountered while
 *       tasks wait interruptibly or with timeout do not change the
 *       state of the phaser. If necessary, you can perform any
 *       associated recovery within handlers of those exceptions,
 *       often after invoking {@code forceTermination}.  Phasers may
 *       also be used by tasks executing in a {@link ForkJoinPool}.
 *       Progress is ensured if the pool's parallelism level can
 *       accommodate the maximum number of simultaneously blocked
 *       parties.
 *
 * </ul> 
 */ 

阶段号(phase number)是指任务被划分成不同阶段的标识符,Phaser通过维护阶段号来协同多个线程的执行,每个阶段结束时,所有注册参与者都必须到达同步点才能进入下一阶段‌。类似于CyclicBarrier,Phaser是可以重复等待的,arriveAndAwaitAdvance()方法和CyclicBarrier的await()方法具有相同的效果。每一阶段的相位器有一个关联的阶段号,这个阶段号从0开始,当所有参与者线程到达相位器后就会增加,最大能增加到 Integer.MAX_VALUE(2^31 - 1),然后重置为0。阶段号的使用能独立的控制到达相位器的动作,以及在相位器上等待其他线程的动作,任务参与方都可以通过调用两类方法来控制:

(1)到达类方法。arrive()方法和arriveAndDeregister()方法报告到达,这类方法是不阻塞的,但会返回一个关联的到达阶段号。当当前阶段的最后一个参与者线程到达后,一个可选的动作将被执行,并且进入下一个阶段。参与者线程执行这些动作触发了进入下一个阶段,并且通过覆盖onAdvance(int, int)方法执行结束动作。覆盖该方法执行结束动作类似执行CyclicBarrier中的屏障动作,但更灵活。

(2)等待类方法。awaitAdvance(int phase)方法,调用该方法表示在指定阶段等待,需要一个表示阶段号的参数,并返回当相位器进入了下一个不同的阶段。不同于CyclicBarrier,调用awaitAdvance()方法后即使被中断线程也会继续等待。可以使用可以被中断和带超时版本的awaitAdvance()方法,但是在线程等待的过程中发生的中断或者超时不会改变相位器的状态。

相位器终止
/**  
 * <p><b>Termination.</b> A phaser may enter a <em>termination</em>
 * state, that may be checked using method {@link #isTerminated}. Upon
 * termination, all synchronization methods immediately return without
 * waiting for advance, as indicated by a negative return value.
 * Similarly, attempts to register upon termination have no effect.
 * Termination is triggered when an invocation of {@code onAdvance}
 * returns {@code true}. The default implementation returns {@code
 * true} if a deregistration has caused the number of registered
 * parties to become zero.  As illustrated below, when phasers control
 * actions with a fixed number of iterations, it is often convenient
 * to override this method to cause termination when the current phase
 * number reaches a threshold. Method {@link #forceTermination} is
 * also available to abruptly release waiting threads and allow them
 * to terminate.
 */  

相位器的终止‌是指当所有注册参与者线程都完成了当前阶段的任务后,相位器会自动进入下一个阶段,当所有阶段都完成了,相位器将不再继续等待,从而实现了任务的终止。一个相位器可以进入终止状态,通过调用isTerminated()方法来判断相位器是否处于终止状态。相位器终止之后,所有同步方法立即返回,通过返回一个负值表示不再继续等待下一个阶段了。同样,试图在终止状态的相位器上注册参与者是没有任何效果的。当调用onAdvance()方法并返回true的时候,相位器终止被触发。当注销动作导致注册参与者的数量变为0是默认的终止方式。调用forceTermination()方法也可以立即释放等待线程并允许他们终止。

相位器分层
/**   
 * <p><b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
 * constructed in tree structures) to reduce contention. Phasers with
 * large numbers of parties that would otherwise experience heavy
 * synchronization contention costs may instead be set up so that
 * groups of sub-phasers share a common parent.  This may greatly
 * increase throughput even though it incurs greater per-operation
 * overhead.
 *
 * <p>In a tree of tiered phasers, registration and deregistration of
 * child phasers with their parent are managed automatically.
 * Whenever the number of registered parties of a child phaser becomes
 * non-zero (as established in the {@link #Phaser(Phaser,int)}
 * constructor, {@link #register}, or {@link #bulkRegister}), the
 * child phaser is registered with its parent.  Whenever the number of
 * registered parties becomes zero as the result of an invocation of
 * {@link #arriveAndDeregister}, the child phaser is deregistered
 * from its parent.
 *
 */   

相位器可以分层的,比如树形结构,以减少竞争。如果相位器的参与线程数量巨大,那么将会带来沉重的同步竞争消耗,此时应该建立共用一个父相位器的子相位器组,这样也许会大大提高系统的吞吐能力。

在树形结构的相位器中,父相位器中的子相位器的注册和注销是自动管理的。当子相位器的注册参与者的数量不为0的时候(比如通过Phaser(Phaser,int)初始化,调用register()方法、bulkRegister()注册),子相位器将会被注册到他们父相位器中。当子相位器的注册参与者的数量为0的时候(调用arriveAndDeregister()方法),子相位器将会从父相位器中注销。

相位器监控
/** 
 * <p><b>Monitoring.</b> While synchronization methods may be invoked
 * only by registered parties, the current state of a phaser may be
 * monitored by any caller.  At any given moment there are {@link
 * #getRegisteredParties} parties in total, of which {@link
 * #getArrivedParties} have arrived at the current phase ({@link
 * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
 * parties arrive, the phase advances.  The values returned by these
 * methods may reflect transient states and so are not in general
 * useful for synchronization control.  Method {@link #toString}
 * returns snapshots of these state queries in a form convenient for
 * informal monitoring.
 *
 */  

当同步方法仅仅是被注册的参与者调用时,相位器的当前状态是可以被任意一个调用者监控的。任何时候,调用getRegisteredParties()方法返回所有的注册参与者的数量,调用getArrivedParties()方法返回已经达到当前相位器的参与者数量,调用getUnarrivedParties()方法返回还没有达到相位器的的参与者数量,当剩下未到达相位器的参与者全部到达后,就会进入下一个阶段。这些方法的返回值反应的是一个瞬间状态,通常在同步控制中没有普遍性的用处。toString()方法返回这些查询状态的快照,通常用于非正式的监控。

总结

本位介绍了Phaser相位器的概念和使用场景,讲解了线程在Phaser相位器上注册与注销的概念、Phaser相位器中阶段号的概念、Phaser相位器分层的背景与作用,最后讲解了相位器监控的一些常用方法,希望通过系统的学些这些概念能加深对Phaser相位器的了解。关于Phaser相位器的常用方法,我们放到下一篇进行讲解。

如果文章对您有帮助,不妨“帧栈”一下,关注“帧栈”公众号,第一时间获取推送文章,您的肯定将是我写作的动力!

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值