抽象队列同步器 AbstractQueuedSynchronizer,是一个用于构建锁、同步器、协作的工具类或者框架。
AQS的源码还是比较复杂的,所以应该先理解其作用,场景,再去理解他的结构,原理。
为什么要有AQS?
我们学过的ReentrantLock和semaphore都有共同点,类似个闸门,只允许一定数量的线程通过。还都是同步的,底层都是AQS.
他们的原理里都有个继承AQS的内部类
AQS的比喻 — 面试
一群人去面试,HR要安排签到、叫号、先来后到等工作,其实就类似与AQS。
而面试官不用关心两个面试者是否号码冲突,也不用管他们的排队等待,也不用管叫号唤醒这些都交给AQS去做。
面试官只要说出自己的需求如群面还是单面,如信号量类似于单面进一个出一个,CountDownLatch是群面,等人都到齐了才开始。剩下的唤醒啥的都交给HR。
总结来说AQS的工作:
- 同步状态state的原子性管理,基于state控制线程的入队阻塞与唤醒
- 阻塞队列的管理
有了AQS我们可以只用关注业务逻辑,甚至可以自己实现自己的线程协作类
java中AQS的实现类:
主要是线程池里的Worker、Reentrantlock里的公平非公平锁、读写锁里的公平非公平锁、Semphore里的公平非公平锁、CountDownLatch里的锁。
AQS源码原理
1. state 同步状态
private volatile int state;
//使用系统的CAS指令来实现compareAndSetState方法
state在每个实现类里的意义都不同:
- 在Semaphore表示剩余的许可证数量
- 在CountDownLatch里代表还需要倒数的数量
- 在ReenTrantLock里代表锁的占用情况,如可重入计数,0表示不被任何线程占有。
2. 控制线程抢锁和配合的FIFO队列
AQS会维护一个等待的线程队列 ,把线程都放到这个队列里,AQS进行排队管理。
当多个线程争用同一把锁的时候,必须有排队机制将那些没有拿到锁的线程串在一起;当锁释放的时候,锁管理器就会挑选一个线程来占有这个刚释放的锁
3. 需要子类去实现的获取、释放等方法
AQS里有很多protected修饰的方法,需要子类自定义实现的
获取方法:
需要依赖state变量,经常会阻塞。比如ReenTrantLock看state不是0就阻塞等待
释放方法:
在Semaphore,释放就是release方法,作用是释放一个许可证
在CountDownLatch里,释放就是countDown方法 ,作用倒数一个数即state-1
自己实现一个AQS
- 写一个类,想好协作的逻辑,实现获取/释放方法。
- 然后内部写一个类继承AbstractQueuedSynchronizer,最后根据想实现的是一个独占,就重写TryAcquire/TryRelease,共享的就实现TryAcquireShared/TryReleaseShared,
- 在第一步里的获取和释放方法里去调用内部类的acquire、release或者shared方法。
/**
* 这是一个类似于CountDownLatch的闩锁类,只不过个数为1,只需要触发signal即可。
* 因为闩锁是共享性的,所以它使用shared获取和释放方法
* <p>
* 代码取自AQS源码注释中举例
*/
public class OneShotLatch {
private Sync sync = new Sync();
public void await() throws InterruptedException {
// 调用AQS的acquireShared,这里的参数没有使用到,实际没啥意义
sync.acquireSharedInterruptibly(1);
}
public void signal() {
// 调用AQS的releaseShared
sync.releaseShared(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
/**
* if (tryAcquireShared(arg) < 0)
* doAcquireSharedInterruptibly(arg);
*
* @return -1就会进入阻塞队列,1就是获取锁成功
*/
@Override
protected int tryAcquireShared(int arg) {
// state int默认是0,不等于0说明release方法被执行了
// 线程这里getState()不是0,就返回1,根据代码就不会进入阻塞队列,就相当于获取到锁了
return getState() != 0 ? 1 : -1;
}
/**
* @return true代表应该唤醒队列中的线程了,false代表还没放闸应该进入阻塞队列等待
*/
@Override
protected boolean tryReleaseShared(int arg) {
setState(1);
return true;
}
}
public static void main(String[] args) {
OneShotLatch oneShotLatch = new OneShotLatch();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 尝试获取latch。。。。");
try {
oneShotLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 开闸放行");
}).start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 开闸,不像countDownLatch需要每个线程倒数
oneShotLatch.signal();
}
}