文章目录
注意:可以结合 AQS详解
一、ReentrantLock介绍
1. 聚合关系总结(类图)
类图如下:
注意:斜体为抽象类,下横线为接口
- ReentrantLock实现了Lock,Serializable接口
- ReentrantLock.Sync(内部类)继承了AQS
- ReentrantLock.NonfairSync和ReentrantLock.FairSync继承了ReentrantLock.Sync
- ReentrantLock持有ReentrantLock.Sync对象(实现锁功能)
2. 锁实现总结
ReentrantLock是基于独占模式模式
实现的
- 由Node节点组成一条同步队列(有head,tail两个指针,并且head初始化时指向空节点)
- int state标记锁使用数量(独占锁时,通常为1,发生重入时>1)
- lock()时加到队列尾部
- unlock()时,释放head节点,并指向下一个节点head=head.next,然后唤醒当前head节点
3. 性质
-
独占锁(排它锁):只能有一个线程获取锁
-
重入锁:
一个线程可以多次lock()
-
公平/非公平锁:只针对上锁过程
非公平锁:尝试获取锁,若成功立刻返回,失败则加入同步队列
公平锁:直接加入同步队列(当同步队列为空时会直接获得锁)
-
区别点 lock()过程(一阶段) tryAcquire()过程(二阶段) FairSync 直接acquire()
当前若无线程持有锁,如果同步队列为空,获取锁
NonFairSync 先尝试获取锁,再acquire()
当前若无线程持有锁,获取锁
二、Lock
1. Lock接口定义了锁的行为
public interface Lock {
//上锁(不响应Thread.interrupt()直到获取锁)
void lock();
//上锁(响应Thread.interrupt())
void lockInterruptibly() throws InterruptedException;
//尝试获取锁(以nonFair方式获取锁)
boolean tryLock();
//在指定时间内尝试获取锁(响应Thread.interrupt(),支持公平/二阶段非公平)
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解锁
void unlock();
//获取Condition
Condition newCondition();
}
2. lock()过程
2.1 锁具体实现
#java.util.concurrent.locks.ReentrantLock
private final Sync sync;
//根据传入参数选择FairSync或NonfairSync实现
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
#java.util.concurrent.locks.ReentrantLock.Sync
abstract void lock();
2.2 公平锁
加入同步队列(当同步队列为空时会直接获得锁
),等待锁
2.2.1 lock()
#java.util.concurrent.locks.ReentrantLock.FairSync
final void lock() {
acquire(1);
}
2.2.2 acquire()
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2.2.3 tryAcquire()
AQS模板方法,获取锁:
#java.util.concurrent.locks.ReentrantLock.FairSync
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//当前锁没被占用
if (!hasQueuedPredecessors() &&//1.判断同步队列中是否有节点在等待
compareAndSetState(0, acquires)) {//2.如果上面!1成立,修改state值(表明当前锁已被占用)
setExclusiveOwnerThread(current);//3.如果2成立,修改当前占用锁的线程为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//占用锁线程==当前线程(重入)
int nextc = c + acquires;//如果是,state继续加1,这里nextc的结果就会 > 1,这个判断表示获取到的锁的线程,还可以再获取锁,这里就是说的可重入的意思
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//修改status
return true;
}
return false;//直接获取锁失败
}
2.3.4 selfInterrupt()
产生一个中断。如果在acquireQueued()中当前线程被中断过,则需要产生一个中断。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
2.3 非公平锁
2.3.1 lock()
#java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() {
//在acquire()之前先尝试获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//acquire()流程与公平锁一模一样,唯一区别在于tryAcquire()实现中
acquire(1);
}
2.3.2 tryAcquire()
#java.util.concurrent.locks.ReentrantLock.NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
2.3.3 nonfairTryAcquire()
过程其实和FairSync.tryAcquire()基本一致,/唯一区别: 这里不会去判断队列中是否为空
#java.util.concurrent.locks.ReentrantLock.Sync
final boolean nonfairTryAcquire(int acquires) {//这个过程其实和FairSync.tryAcquire()基本一致
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//唯一区别: 这里不会去判断队列中是否为空
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3.unlock()过程
3.1 unlock()
#java.util.concurrent.locks.ReentrantLock
public void unlock() {
sync.release(1);
}
#java.util.concurrent.locks.ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//持有锁的线程==当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//重入锁全部释放
free = true;
//置空持有锁线程
setExclusiveOwnerThread(null);
}
//state==0(此时持有锁,不用cas)
setState(c);
return free;
}
3.2 release()
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放锁
Node h = head;
if (h != null &&//head节点为空(非公平锁直接获取锁)
h.waitStatus != 0)
unparkSuccessor(h);//唤醒同步队列中离head最近的一个waitStatus<=0的节点
return true;
}
return false;
}
3.3 tryRelease()
#java.util.concurrent.locks.ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//持有锁的线程==当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//重入锁全部释放
free = true;
//置空持有锁线程
setExclusiveOwnerThread(null);
}
//state==0(此时持有锁,不用cas)
setState(c);
return free;
}
4.lockInterruptibly()过程
lockInterruptibly()与lock()过程基本相同,区别在于Thread.intterpt()的应对措施不同
4.1 lockInterruptibly()
#java.util.concurrent.locks.ReentrantLock
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
4.2 acquireInterruptibly()
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
4.3 doAcquireInterruptibly()
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//唯一区别当Thread.intterpt()打断时,直接抛出异常
throw new InterruptedException();
}
} finally {
if (failed)//然后移除当前节点
cancelAcquire(node);
}
}
5.tryLock()
#java.util.concurrent.locks.ReentrantLock
public boolean tryLock() {
//尝试获取非公平锁
return sync.nonfairTryAcquire(1);
}
6. tryLock(long timeout, TimeUnit unit)
#java.util.concurrent.locks.ReentrantLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
6.1 tryAcquireNanos()
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||//获取锁(公平/非公平)
doAcquireNanos(arg, nanosTimeout);//在指定时间内等待锁(空转)
}
6.2 doAcquireNanos()
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
...
final long deadline = System.nanoTime() + nanosTimeout;
//加入队尾
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
//上面与acquireQueued()相同,重点看这里
//计算剩余时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//利用parkNanos()指定空转时间
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())//如果被Thread.interrupt(),则抛异常
throw new InterruptedException();
}
} finally {
if (failed)//移除节点
cancelAcquire(node);
}
}
7.newCondition()
public Condition newCondition() {
return sync.newCondition();
}
#java.util.concurrent.locks.ReentrantLock.Sync
final ConditionObject newCondition() {
return new ConditionObject();
}