lock实现ReentrantLock分析
ReentrantLock 是 JUC 中提供的可中断, 可重入获取, 支持超时, 支持尝试获取锁
ReentrantLock 内部封装了成员变量 private final Sync sync;
1. lock操作
1.1 非公平锁 ,获取锁的方式是随机竞争的,新来的线程也有机会抢占锁
public ReentrantLock() { sync = new NonfairSync();}
源码解析:
一些重要的点我提出来说一下(先看源码):
1. 非公平锁体现在新来的线程(还未加入到同步阻塞队列中)是有机会与队列中的线程争抢锁的,但是同步阻塞队列本质上还是一个FIFO队列。进入队列后节点调用acquireQueued方法让自己阻塞或者尝试获取锁,但是不是每个节点都有获取锁的资格,acquireQueued方法还保证了只有头节点的后继节点才有资格去争抢锁。当线程获取了锁之后,意味着原先的头节点(可能是初始化队列时的傀儡空节点)就会从这个同步队列中移除。同时还会设置获取同步状态的此线程为头节点。
2. 我们都知道等待队列中的线程节点是无法争抢锁的,它们必须先被唤醒,然后加入到同步队列中。同时,同步阻塞队列中的线程节点并不是在不停的尝试争抢锁,而是处于阻塞状态。那么,什么时候应该被阻塞park,什么时候应该解除阻塞unpark(这里为了避免误解,用 “解除阻塞” 而不是 “唤醒”)。线程进入阻塞队列后的逻辑在acquireQueued方法中。
3. 为什么必须前驱结点为头结点才尝试去获取锁?因为头结点表示当前正占有锁的线程(或者未无意义的空节点),正常情况下该线程释放锁后会通知后面结点中阻塞的线程(查看锁释放流程源码),阻塞线程被唤醒后去获取锁,这是我们希望看到的。然而还有一种情况,就是前驱结点取消了等待,此时当前线程也会被唤醒,这时候就不应该去获取锁,而是往前回溯一直找到一个没有取消等待的结点,然后将自身连接在它后面。一旦我们成功获取了锁并成功将自身设置为头结点,就会跳出for循环。否则就会执行第二个if分句:确保前驱结点的状态为SIGNAL,然后阻塞当前线程。
4. 同步队列中,线程在阻塞自己前必须设置前驱结点的状态为SIGNAL,否则它不会阻塞自己,同时阻塞的线程都是由前驱节点进行唤醒(解除阻塞)的。
final void lock() {
/* 新来的线程通过CAS方式尝试将自己状态置为加锁状态(具体实现由JVM完成,cas可以自行了解)
这里是非公平模式实现要点,这样做主要是为了新来的线程和排队中的线程竞争,哪个线程抢的
快,哪个就能拿到锁 */
if (compareAndSetState(0, 1))
// 如果设置加锁状态成功,则设置AbstractQueuedSynchronizer中的全局变量线程为当前当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果CAS快速获取锁方式失败,则执行通用获取锁逻辑。
acquire(1);
}
public final void acquire(int arg) {
// 调用tryAcquire(arg)获取锁
if (!tryAcquire(arg) &&
// 如果获取失败,则调用addWaiter方法进行排队操作,将当期线程构造成节点,加入到同步阻塞队列中,
// 这里Node.EXCLUSIVE常量为null。之后调用acquireQueued方法进行自旋获取锁操作
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 这里涉及到ReetrantLock的结构,自行看源码即可。此处调用的是非公平锁的实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 为0表示当前状态未加锁
if (c == 0) {
// 继续尝试加锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 已经加锁,且当前正在执行的线程是此线程,则表示这是一次锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新锁状态值
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
// 将当前线程构造成新的节点,其后置节点为mode,这里是null
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { // 队尾节点不为空,则从尾部入队
// 设置前置节点
node.prev = pred;
// cas快速添加到尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 队尾节点为空(队列还未初始化)或上述方式不成功,则调用enq方法入队
enq(node);
return node;
}
/**
* 创建以当前线程为基础的节点,先走快速添加到尾部逻辑,获取尾节点如果尾节点存在,将当前节点和尾节点相连,
* 并用CAS方式将当前节点设置为尾节点,这边使用CAS方式考虑了多个线程同时操作尾节点的情况,
* 所以如果尾节点已经变更则快速添加节点操作失败,调用enq(node)方法走最大努力添加节点的逻辑。
* enq(node)最大努力添加逻辑就是一直添加节点直到添加节点到尾部成功。
*/
private Node enq(final Node node) {
// 常规的队列加入操作,死循环直到成功加入
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// **** 如果头尾节点为空则创建一个傀儡空节点当头尾节点,用以初始化队列
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 线程加入同步队列后的运行逻辑
final boolean acquireQueued(final Node node, int arg) {
// 这个循环的主要作用就是在线程激活后尝试获取锁直到成功
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取其前置节点
final Node p = node.predecessor();
// 只有当前节点的前置节点为头结点时,当前节点才有资格进行尝试获取锁操作
if (p == head && tryAcquire(arg)) {
//在设置头节点的过程中不需要任何的同步操作,因为独占式锁中能获取同步状态的必定是同一个线程。
setHead(node);
// 从队列中删除原头节点。原头节点的前置节点为null,thred为null,将next也设为null方便GC
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否需要阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果需要阻塞等待则调用parkAndCheckInterrupt()阻塞此线程不再尝试,并让出cup资源直到被前一个节点激活
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//判断前置线程节点的状态是否是SIGNAL状态,如果是则表示前置线程节点已经被通知,这样当前线程就可以阻塞安心的等待上个节点的激活
if (ws == Node.SIGNAL)
return true;
// 状态为CANCELLED,则一直往队列头部回溯直到找到一个状态不为CANCELLED的结点,将当前节点node挂在这个结点的后面
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 这里也就意味着状态为0,是初始状态(头节点的状态),则通过csa将状态置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
其实这个方法的含义很简单,就是确保当前结点的前驱结点的状态为SIGNAL,SIGNAL意味着线程释放锁后会唤醒后面阻塞的线程。毕竟,只有确保能够被唤醒,当前线程才能放心的阻塞。
但是要注意只有在前驱结点已经是SIGNAL状态后才会返回true刻执行acquireQueued后面的步骤,立即阻塞,
对应上面的第一种情况。其他两种情况则因为返回false而重新执行一遍for循环。这种延迟阻塞其实也是一种
高并发场景下的优化,试想我如果在重新执行循环的时候成功获取了锁,是不是线程阻塞唤醒的开销就省了呢?
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
(以下参考https://www.cnblogs.com/takumicx/p/9402021.html)
为什么基于FIFO的同步队列可以实现非公平锁?
由FIFO队列的特性知,先加入同步队列等待的线程会比后加入的线程更靠近队列的头部,那么它将比后者更早的被唤醒,它也就能更早的得到锁。从这个意义上,对于在同步队列中等待的线程而言,它们获得锁的顺序和加入同步队列的顺序一致,这显然是一种公平模式。然而,线程并非只有在加入队列后才有机会获得锁,哪怕同步队列中已有线程在等待,非公平锁的不公平之处就在于此。回看下非公平锁的加锁流程,线程在进入同步队列等待之前有两次抢占锁的机会:
- 第一次是非重入式的获取锁,只有在当前锁未被任何线程占有(包括自身)时才能成功;
- 第二次是在进入同步队列前,包含所有情况的获取锁的方式。
只有这两次获取锁都失败后,线程才会构造结点并加入同步队列等待。而线程释放锁时是先释放锁(修改state值),然后才唤醒后继结点的线程的。试想下这种情况,线程A已经释放锁,但还没来得及唤醒后继线程C,而这时另一个线程B刚好尝试获取锁,此时锁恰好不被任何线程持有,它将成功获取锁而不用加入队列等待。线程C被唤醒尝试获取锁,而此时锁已经被线程B抢占,故而其获取失败并继续在队列中等待。
为什么非公平锁性能好
非公平锁在高并发情况下会出现的饥饿问题,在锁竞争激烈的情况下,在队列中等待的线程可能迟迟竞争不到锁。但是考虑性能原因,非公平锁使用的还是很多。
非公平锁对锁的竞争是抢占式的(队列中线程除外),线程在进入等待队列前可以进行两次尝试,这大大增加了获取锁的机会。这种好处体现在两个方面:
-
1.线程不必加入等待队列就可以获得锁,不仅免去了构造结点并加入队列的繁琐操作,同时也节省了线程阻塞唤醒的开销,线程阻塞和唤醒涉及到线程上下文的切换和操作系统的系统调用,是非常耗时的。在高并发情况下,如果线程持有锁的时间非常短,短到线程入队阻塞的过程超过线程持有并释放锁的时间开销,那么这种抢占式特性对并发性能的提升会更加明显。
-
2.减少CAS竞争。如果线程必须要加入阻塞队列才能获取锁,那入队时CAS竞争将变得异常激烈,CAS操作虽然不会导致失败线程挂起,但不断失败重试导致的对CPU的浪费也不能忽视。除此之外,加锁流程中至少有两处通过将某些特殊情况提前来减少CAS操作的竞争,增加并发情况下的性能。一处就是获取锁时将非重入的情况提前,如下图所示
另一处就是入队的操作,将同步队列非空的情况提前处理
这两部分的代码在之后的通用逻辑处理中都有,很显然属于重复代码,但因为避免了执行无意义的流程代码,比如for循环,获取同步状态等,高并发场景下也能减少CAS竞争失败的可能。
1.2 公平锁,获取锁的方式是按照线程加锁的顺序来分配的,先来先得FIFO
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}
公平锁的代码实现就不一一贴出来了,对比非公平锁主,要区别在于两个方法
1. 缺少了第一次的cas方式获取锁。
final void lock() {
this.acquire(1);
}
2. tryAcqyure方法,新增了 判断队列中是否有优先级更高的等待线程
protected final boolean tryAcquire(int var1) {
Thread var2 = Thread.currentThread();
int var3 = this.getState();
if (var3 == 0) {
// 判断队列中是否存在优先级更高的线程节点
if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, var1)) {
this.setExclusiveOwnerThread(var2);
return true;
}
} else if (var2 == this.getExclusiveOwnerThread()) {
int var4 = var3 + var1;
if (var4 < 0) {
throw new Error("Maximum lock count exceeded");
}
this.setState(var4);
return true;
}
return false;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
由于头结点是当前获取锁的线程,队列中的第二个结点代表的线程优先级最高。
那么我们只要判断队列中第二个结点是否存在以及这个结点是否代表当前线程就行了。这里分了两种情况进行探讨:
- 第二个结点已经完全插入,但是这个结点是否就是当前线程所在结点还未知,所以通过s.thread != Thread.currentThread()进行判断,如果为true,说明第二个结点代表其他线程。
- 第二个结点并未完全插入,我们知道结点入队一共分三步:
- 1.待插入结点的pre指针指向原尾结点
- 2.CAS更新尾指针
- 3.原尾结点的next指针指向新插入结点
所以(s = h.next) == null 就是用来判断2刚执行成功但还未执行3这种情况的。这种情况第二个结点必然属于其他线程。
以上两种情况都会使该方法返回true,即当前有优先级更高的线程在队列中等待,那么当前线程将不会执行CAS操作去获取锁,保证了线程获取锁的顺序与加入同步队列的顺序一致,很好的保证了公平性,但也增加了获取锁的成本。
2. unlock操作
不区分公平锁与非公平锁,内容相对也比较简单。
public void unlock() {sync.release(1);}
public final boolean release(int arg) {
// 确保锁(包括重入的情况)都释放
if (tryRelease(arg)) {
Node h = head;
// 锁释放完成后,如果头节点不为null,且状态值不等于0,则执行解除阻塞方法(>0意味着CANCELLD取消状
// 态,<0这里应该只能是SIGNAL,即-1状态,意味着下个节点正在阻塞等待。等于0说明还没被下个节点线程
// 调用shouldParkAfterFailedAcquire(p, node)方法设置成SIGNAL状态,也就说明下个节点线程还没被
// 阻塞)通知阻塞队列中的某个线程解除阻塞(规则定义在unparkSuccessor中)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 所以对于重入锁的情况,重入多少次就需要手动释放多少次
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 一般情况下只要唤醒后继结点的线程就行了,但是后继结点可能已经取消等待,所以从队列尾部往前回溯,找到离头结点最近的正常结点,并唤醒其线程。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//激活线程
LockSupport.unpark(s.thread);
}
3. 其他:
lock源码分析参考文章 https://www.cnblogs.com/takumicx/p/9402021.html
关于LockSupport的简单了解来自于https://www.cnblogs.com/takumicx/p/9328459.html
一个lock创建多个Condition,也就是说可以拥有多个等待条件队列,但是只有一个同步队列。
demo(多生产者多消费者模式)
在多生产者多消费者模式中,synchronized加锁方式,在执行通知时,需要调用notifyAll方法唤醒所有的消费者与生产者(这其实是额外的开销,最好的情况是通知某一个生产者或者某个消费者),而不能用notify方法(因为可能会造成生产者唤醒生产者,这种现象累积多了,就会导致唤醒的全是生产者,最终程序假死)。使用lock/condtion机制可以利用两个Condition分别控制生产者线程和消费者线程。简易代码如下:
package pc4;
import pc2.MyThread;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author LiFeng
* @create 2019-10-11 上午 11:07
* @describe
*/
public class LockTest2 {
public static void main(String[] args) {
Resource res = new Resource();
MyThreadA[] threadAS = new MyThreadA[10];
MyThreadB[] threadBS = new MyThreadB[10];
for (int i = 0; i < 3; i++) {
threadAS[i] = new MyThreadA(res, i);
threadAS[i].start();
threadBS[i] = new MyThreadB(res, i);
threadBS[i].start();
}
}
}
class Resource {
private Lock lock = new ReentrantLock();
private Condition pc = lock.newCondition();
private Condition cc = lock.newCondition();
private boolean hasValue = false;
public void set() {
try {
lock.lock();
while (hasValue) {
System.out.println(Thread.currentThread().getName() + " | set wait");
pc.await();
}
System.out.println(Thread.currentThread().getName() + " | set");
hasValue = true;
cc.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void get() {
try {
lock.lock();
while (!hasValue) {
System.out.println(Thread.currentThread().getName() + " | get wait");
cc.await();
}
System.out.println(Thread.currentThread().getName() + " | get");
hasValue = false;
pc.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
class MyThreadA extends Thread {
private Resource res;
MyThreadA(Resource res, int i) {
this.res = res;
setName("s" + i);
}
@Override
public void run() {
super.run();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
res.set();
}
}
}
class MyThreadB extends Thread {
private Resource res;
MyThreadB(Resource res, int i) {
this.res = res;
setName("g" + i);
}
@Override
public void run() {
super.run();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
res.get();
}
}
}