并发编程原理与实战(四)经典并发协同方式synchronized与wait+notify详解
点击上面链接快速查看前面系列文章,本文我们来分析传统并发协同方式中常见的伪唤醒问题以及synchronized加锁不生效的问题。
伪唤醒
在上一篇的中,官方在wait(long timeoutMillis, int nanos)方法的注释中提到,应该在while循环中检查是否满足等待条件,并在代码块中调用wait()方法以防止虚假唤醒问题,也就是常说的伪唤醒问题。那么什么情况下会出现伪唤醒问题?如何重现该问题?
首先,我们得弄清楚伪唤醒的含义,伪唤醒通常在多线程环境下出现,一个线程在等待某个条件时被唤醒后,实际上条件并没有满足,也就是线程在条件并没有满足的情况下被唤醒了。下面通过一个例子来说明重现该问题。
public class SpuriousWakeupsDemo {
//任务队列
private final Queue<Integer> taskQueue = new LinkedList<>();
//队列容量
private static final int MAX_QUEUE_SIZE = 3;
//锁对象
private final Object lockObject = new Object();
//生产任务
class ProducerTask implements Runnable {
@Override
public void run() {
int taskNumber = 1;
while (true) {
try {
synchronized (lockObject) {
// 队列已满时阻塞等待
while (taskQueue.size() >= MAX_QUEUE_SIZE) {
System.out.println("队列已满,"+Thread.currentThread().getName()+"等待...");
lockObject.wait();
}
//往队列里添加任务
taskQueue.offer(taskNumber);
System.out.println(Thread.currentThread().getName()+"添加任务: " + taskNumber);
taskNumber++;
//通知消费者可以消费,建议使用 notifyAll() 避免死锁
lockObject.notifyAll();
}
//模拟生产耗时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 消费任务
class ConsumerTask implements Runnable {
@Override
public void run() {
while (true) {
try {
synchronized (lockObject) {
// 队列为空时阻塞等待
if (taskQueue.isEmpty()) {
System.out.println("队列为空,"+Thread.currentThread().getName()+"等待...");
lockObject.wait();
}
// 消费任务
Integer task = taskQueue.poll();
if (task == null) {
System.out.println("队列为空,"+Thread.currentThread().getName()+"却被虚假唤醒了");
} else {
System.out.println(Thread.currentThread().getName()+"处理任务: " + task);
}
// 通知生产者可以生产,建议使用 notifyAll() 避免死锁
lockObject.notifyAll();
}
// 模拟消费耗时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
SpuriousWakeupsDemo demo = new SpuriousWakeupsDemo();
//创建生产者线程
Thread producer = new Thread(demo.new ProducerTask(), "ProducerThread");
//创建消费者线程
Thread consumer1 = new Thread(demo.new ConsumerTask(), "ConsumerThread-1");
Thread consumer2 = new Thread(demo.new ConsumerTask(), "ConsumerThread-2");
Thread consumer3 = new Thread(demo.new ConsumerTask(), "ConsumerThread-3");
//启动线程
producer.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
程序分析:
上面的例子是一个生产者-消费者模型,创建了一个指定容量大小的队列,生产者任务不断地往队列中添加任务,消费者任务则不断从队列中取出任务,通过synchronized和wait(),notifyAll()实现线程并发协同运行,执行“生产-消费”动作。然后创建了一个生产者线程和3个消费者线程,之所以消费者线程数量比生产者线程数量多,是为了加快消费速度,方便重现问题。这里特别地将消费者任务中判断队列是否不为空的判断用if条件来判断。我们看看运行结果:
ProducerThread添加任务: 1
ConsumerThread-1处理任务: 1
队列为空,ConsumerThread-2等待...
队列为空,ConsumerThread-3等待...
队列为空,ConsumerThread-1等待...
ProducerThread添加任务: 2
ConsumerThread-2处理任务: 2
队列为空,ConsumerThread-1却被虚假唤醒了
队列为空,ConsumerThread-3却被虚假唤醒了
ProducerThread添加任务: 3
ConsumerThread-3处理任务: 3
队列为空,ConsumerThread-2等待...
队列为空,ConsumerThread-1等待...
队列为空,ConsumerThread-3等待...
ProducerThread添加任务: 4
ConsumerThread-2处理任务: 4
队列为空,ConsumerThread-3却被虚假唤醒了
队列为空,ConsumerThread-1却被虚假唤醒了
...
从运行结果中可以看出,部分消费者线程取到的任务是空的,没有进入等待状态却被唤醒了,不符合队列不为空的时候进入等待状态的期望。问题就在于if条件判断,分析下运行结果:生产者线程往队列添加任务1后,消费者线程1读取任务了1,然后队列为空,3个消费者线程进入等待状态;生产者线程往队列添加任务2后同时唤醒3个消费者线程,由于是if条件判断,3个消费者线程被唤醒后并没有再次检查队列是否为空,直接从队列中拿任务,最终只有一个消费者线程拿到了任务,另外两个消费者线程被虚假唤醒了。从这个例子也可以看出,伪唤醒并不是什么问题,而是代码上的问题,由于while循环在线程被唤醒后都会先判断条件是否满足,修改为while循环判断后就解决了。运行结果如下:
ProducerThread添加任务: 1
ConsumerThread-3处理任务: 1
队列为空,ConsumerThread-2等待...
队列为空,ConsumerThread-1等待...
ProducerThread添加任务: 2
ConsumerThread-2处理任务: 2
队列为空,ConsumerThread-1等待...
队列为空,ConsumerThread-3等待...
ProducerThread添加任务: 3
ConsumerThread-1处理任务: 3
队列为空,ConsumerThread-3等待...
队列为空,ConsumerThread-2等待...
ProducerThread添加任务: 4
ConsumerThread-3处理任务: 4
队列为空,ConsumerThread-2等待...
队列为空,ConsumerThread-1等待...
...
synchronized加锁不生效场景
前文我们已经知道了synchronized的使用方法,但实际使用中经常会遇到使用了synchronized给指定的方法或者代码块加锁没有生效的问题,就是加锁不成功,下面我们来分析下有几种常见的会导致synchronized加锁不成功的场景。
1、锁对象不一致,或者说不是同一把锁。
多个线程调用同一个synchronized修饰的方法时要保证多个线程抢的是同一把锁,否则加锁不起作用。先看下面的这个例子,这个例子模拟定时执行统计任务,开启了多线程去定时执行指定的方法,期望是每次只能有一个线程执行任务,防止任务重复执行生成重复数据。
public class JobTask {
public synchronized void execute() {
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "开始执行任务...");
try {
//模拟执行耗时
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "执行任务完成...");
}
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
JobTask jobTask = new JobTask();
jobTask.execute();
}, "t1");
Thread t2 = new Thread(() -> {
JobTask jobTask = new JobTask();
jobTask.execute();
}, "t2");
Thread t3 = new Thread(() -> {
JobTask jobTask = new JobTask();
jobTask.execute();
}, "t3");
t1.start();
t2.start();
t3.start();
}
}
例子中通过System.currentTimeMillis()打印当前的时间戳用来判断线程是否同时执行,运行结果:
1746670704571, t1开始执行任务...
1746670704583, t2开始执行任务...
1746670704583, t3开始执行任务...
1746670706583, t2执行任务完成...
1746670706583, t1执行任务完成...
1746670706586, t3执行任务完成...
从运行结果可以看出,一个线程没有执行完,其他线程就开始执行了,且同一时间有两个线程执行统计任务,明明加了synchronized,为什么没起作用?将synchronized移到方法里面对代码块进行加锁看看。
public void execute() {
synchronized(this) {
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "开始执行任务...");
try {
//模拟执行耗时
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "执行任务完成...");
}
}
运行结果:
1746670933438, t1开始执行任务...
1746670933439, t2开始执行任务...
1746670933439, t3开始执行任务...
1746670935441, t3执行任务完成...
1746670935441, t2执行任务完成...
1746670935441, t1执行任务完成...
还是一样,synchronized对代码块加锁还是不成功。我们知道,synchronized修饰实例方法或者代码块时,锁对象是调用该方法的对象,上面的例子中,每次都是new JobTask()创建一个jobTask对象,再通过jobTask对象调用execute()方法,很明显锁对象不是同一个,我们通过System.identityHashCode()来打印下锁对象的hashCode。
public synchronized void execute() {
System.out.println("锁hashcode: "+System.identityHashCode(this));
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "开始执行任务...");
try {
//模拟执行耗时
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "执行任务完成...");
}
运行结果:
锁hashcode: 501183200
锁hashcode: 547680545
1746672539597, t2开始执行任务...
锁hashcode: 350270973
1746672539599, t3开始执行任务...
1746672539599, t1开始执行任务...
1746672541609, t1执行任务完成...
1746672541609, t3执行任务完成...
1746672541610, t2执行任务完成...
从运行结果可以看到,每个线程调用execute()方法时锁的hashcode都不一样,说明不是同一把锁。改造下代码,每次调用execute()都是用同一个对象。
public static void main(String[] args) {
JobTask jobTask = new JobTask();
Thread t1 = new Thread(() ->jobTask.execute(), "t1");
Thread t2 = new Thread(() ->jobTask.execute(), "t2");
Thread t3 = new Thread(() ->jobTask.execute(), "t3");
t1.start();
t2.start();
t3.start();
}
运行结果:
锁hashcode: 1661881880
1746672708944, t1开始执行任务...
1746672710954, t1执行任务完成...
锁hashcode: 1661881880
1746672710954, t3开始执行任务...
1746672712964, t3执行任务完成...
锁hashcode: 1661881880
1746672712964, t2开始执行任务...
1746672714973, t2执行任务完成...
这次符合我们的期望了,锁对象也是同一个,每次只能有一个线程执行execute()方法,且执行完成下一个线程才能执行。
2、锁对象被修改。
改造下上面的例子,用一个Integer的变量来做锁对象。
public class JobTask {
//执行次数
private static Integer executeCount = 0;
public void execute() {
System.out.println("锁hashcode: "+System.identityHashCode(executeCount));
synchronized(executeCount) {
executeCount = executeCount + 1;
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "开始执行任务...");
try {
//模拟执行耗时
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "执行任务完成...");
}
}
public static void main(String[] args) {
JobTask jobTask = new JobTask();
Thread t1 = new Thread(() ->jobTask.execute(), "t1");
Thread t2 = new Thread(() ->jobTask.execute(), "t2");
Thread t3 = new Thread(() ->jobTask.execute(), "t3");
t1.start();
t2.start();
t3.start();
}
}
运行结果:
锁hashcode: 2050380822
1746675690765, t1开始执行任务...
锁hashcode: 1908758489
1746675690765, t2开始执行任务...
锁hashcode: 1161626406
1746675690765, t3开始执行任务...
1746675692778, t1执行任务完成...
1746675692778, t2执行任务完成...
1746675692778, t3执行任务完成...
线程执行execute()后对锁对象进行修改,后面的线程执行执行execute()时拿到的锁对象已经不是同一个了,所以多个线程对execute()的访问不是互斥的。
3、锁对象冲突。
先看下面这个例子,期望是用两把锁控制execute1()方法,execute2()方法的执行。
public class JobTask {
public static final String lock1 = "lock";
public static final String lock2 = "lock";
public void execute1() {
System.out.println("lock1 hashcode: "+System.identityHashCode(lock1));
synchronized(lock1) {
doExecute();
}
}
public void execute2() {
System.out.println("lock2 hashcode: "+System.identityHashCode(lock2));
synchronized(lock2) {
doExecute();
}
}
private void doExecute() {
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "开始执行任务...");
try {
//模拟执行耗时
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + "执行任务完成...");
}
public static void main(String[] args) {
JobTask jobTask = new JobTask();
Thread t1 = new Thread(() ->jobTask.execute1(), "t1");
Thread t2 = new Thread(() ->jobTask.execute2(), "t2");
t1.start();
t2.start();
}
}
上面的代码中,使用两个相同字符串的不同对象作为两个锁对象,分别锁住execute1()方法和execute2()方法的执行,按我们的期望,两个方法使用的是不同的锁对象,应该同时执行。看看运行结果
lock1 hashcode: 1918590210
1746685050685, t1开始执行任务...
lock2 hashcode: 1918590210
1746685052691, t1执行任务完成...
1746685052692, t2开始执行任务...
1746685054692, t2执行任务完成...
从运行结果可以看出,execute1()方法和execute2()方法并没有同时执行而是互斥执行,实际上用的都是同一把锁,原因就在于java的字符串常量池。直接使用双引号创建字符串时,这个字符串会添加到字符串常量池中。当创建一个字符串对象时,JVM先检查常量池中是否存在相同的字符串,如果存在,直接返回常量池中该字符串的引用。所以会看到lock1和lock2的hashcode是一样的,因为他们实际上是同一个引用。
4、分布式环境下使用。
synchronized只适合在单机环境下使用,在分布式环境下使用synchronized加锁是不起作用的,因为synchronized是jvm层面上的锁,分部署环境下往往是同一套程序部署在不同的机器上,每台机器上都有一个jvm运行,他们是相互独立的。分布式环境下可以使用基于redis实现的或者基于zk实现的分部式锁,后面我们会学到。
上面列举的synchronized失效的场景,不管是哪种,其核心思想都是围绕多个线程“抢的是不是同一把锁”来分析,只有上的是同一把锁,才能加锁成功,要跳出使用synchronized了就能加锁的误区。
总结
本文通过例子详细说明了基于synchronized与wait() notify()实现并发协同时出现的伪唤醒问题,以及列举了几种常见的synchronized加锁失效的问题,希望通过论证这些问题能让大家进一步掌握synchronized与wait() notify()的应用。
如果文章对您有帮助,不妨“帧栈”一下,关注“帧栈”公众号,第一时间获取推送文章,您的肯定将是我写作的动力!