该问题涉及和引申的点:
1、wait、notify方法配合的正确方式
2、线程的状态流转
3、阻塞或繁忙线程的定位方式
4、grpc的源码跟踪
一、第一章问题现象和背景描述
上一个大版本的分支产品在线上上出现新增设备一直卡住的现象,客户端调用始终不返回,导致客户加不了设备。
另外,还发现该进程的cpu利用率一直在400%左右。
二、问题初步分析
接到该问题就比较疑惑,之前资源模块也没有出现类似问题,看这现象基本可以断定是线程被block住了,难道资源模块一个隐藏很深的bug。
三、问题定位
四、第一章问题现象和背景描述
带着自己的疑问,首先想到的是使用arthas的thread –b命令,偷个懒去快速定位阻塞线程在哪里,根据结果并没有发现我负责模块的阻塞情况,反而看到了告警模块的block堆栈信息(未保留截图)。
那么具体是怎么阻塞的呢?
实在没看出来只有用jstack工具了。和同事dump了整个进程的堆栈信息,全文搜索“BLOCK”结果,可以发现非常多的BLOCKED线程:
我们点进去检查是否有我们相关堆栈的,找到:
可以看到整个线程是“BLOCKED”状态,且是一个monitor-entry的获取状态,说明这里使用了synchronized关键字做了同步限制,该线程等待的锁对象为“0x000000073f951208”,那么我们再搜索整个锁对象,查看该对象具体是被哪个线程锁持有没有释放,那么再根据对应线程的堆栈信息就可以定位到阻塞的关键点了,搜索锁对象“0x000000073f951208”结果如下:
可以清楚的看到这个所对象有很多线程都在等待获取,定位到当前拥有该锁对象的详细堆栈:
堆栈从下到上,业务流程为:
1、保存设备信息
2、修改设备信息
3、发送设备状态告警
4、调用grpc发送状态告警
5、卡在grpc的ClientCalls#blockingUnaryCall方法内部
然后看了一下变更,只有此处是使用的grpc进行模块间调用,一面在纳闷为何此处不和别处保持一致,直接用api调用呢,进程间调用为何要搞这么麻烦,此处虽有疑问但是还没有想通。在自己的认知内,rpc怎么会一直卡住呢,怎么着都有连接超时和读取超时的限制,一般也不会配置的太长,就算是有问题也不至于导致线程卡住超过几个小时。于是,向对接人下了一个现在看来大胆而又不太站得住立场的结论:该阻塞问题是由于进程间调用使用了grpc导致,具体grpc中为何出现该问题我不得而知。于是我们确认客户允许重启服务后重启了该进程,该问题恢复了。
现场问题处理了,下一步就是出补丁修复该问题,在下结论后就打算将rpc调用修改为本地api调用,这样应该就解决了。可第二天,仔细摸索了代码和模块依赖才发现,因为被调用的模块已经依赖了调用方,所以开发该功能的同事才会使用grpc的方式实现。我再看了附近的逻辑,都有同进程的grpc调用实现,为什么其他的逻辑没有遇到这个问题呢?
想到这里,就发现昨天下的结论是站不住脚的,同进程grpc调用可能没有超时时间一说,当然这一点也可以当做一个研究点深入了解一下,那么进一步定位根本原因。
既然不是grpc组件的问题,那么问题肯定就出在被调用方了,查找一下对应的堆栈信息,看下有没有蛛丝马迹,我调用的告警的方法名为“alarmDealWithToRear”,根据该关键字搜索如下,整个文件有24个之多:
随便点开一个堆栈都是清一色的“BLOCKED”状态:
现在根据该堆栈中的所对象再定位,可以看到该锁对象被一个线程持有,其他线程都在等待获取该锁所以处于“BLOCKED”状态:
点开等待锁和持有锁的线程的详细堆栈:
可以确认造成阻塞的罪魁祸首为AlarmDealThread这个锁对象,到这里我们就可以结合代码去分析了。
关键代码为:
调用alarmDealWithToRear方法尝试加入队列并唤醒处理线程,这里使用了synchronized关键字,和堆栈一致。那么就是处理线程一直没有释放锁导致的,左边这里线程一跑起来就获取锁,很明显是有问题的,如果里面的循环一直不停,右边则始终获取不到锁。那么是什么原因导致左边的内部循环不停止呢?那就是alarmInfoQueue这个队列始终不为空,走不到wait方法去释放锁,很明显就是下面的dealAlarm方法抛出异常,但是该元素又没有出队,所以一直循环执行。这也是为什么,阻塞的同时还伴随着cpu利用率高到400的原因。
至此,根本问题定位,修改点为:
1、修改线程中加锁的位置,放到wait()方法外面
2、dealAlarm方法中如果处理失败,尝试将处理失败的元素移除队列,防止一直循环处理(当然更应该去定位具体异常的原因并解决,这里由于时间关系没有做)
总结
1、定位问题一定要定位到直接原因和根本原因,下结论不可大意
2、根据BLOCKED的线程锁等待的锁对象,去定位持有锁的线程
3、这个问题的处理过程中,还可以去深入研究grpc的内部逻辑实现
4、附录为问题代码的复刻版本,可以作为参考
5、Notify方法和wait方法配合的正确打开方式
6、如果实现批量的生产-消费模式,可以使用定时线程池和阻塞队列的阻塞take方法来实现
附录 问题代码段的抽取
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class BlockTest {
private static final LinkedBlockingQueue<Element> QUEUE = new LinkedBlockingQueue<>();
static final DealThread dealThread = new DealThread();
static {
dealThread.start();
}
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
long random = Math.round(1000D);
// 构造属性为null的元素
Element ele = new Element(random % 2L == 0 ? null : random);
System.out.println("入队元素:" + ele);
QUEUE.add(ele);
// 如果线程是等待状态则唤醒
if (dealThread.getState() == Thread.State.WAITING) {
synchronized (dealThread) {
dealThread.notify();
}
}
}
}).start();
}
static class DealThread extends Thread {
@Override
public void run() {
// 问题出在这里
synchronized (this) {
for (; ; ) {
try {
// 当队列为空,挂起
if (QUEUE.isEmpty()) {
System.out.println("队列没有数据,暂时挂起");
this.wait();
}
deal();
} catch (Exception e) {
System.err.println("循环异常:" + e.getMessage());
}
}
}
}
private void deal() {
List<Element> batch = QUEUE.stream().limit(100).collect(Collectors.toList());
try {
for (Element element : batch) {
System.out.println(element.getValue().toString());
}
if (!QUEUE.isEmpty()) {
QUEUE.removeAll(batch);
}
} catch (NullPointerException e) {
System.err.println("处理错误:" + e.getMessage());
QUEUE.remove(null);
}
}
}
static class Element {
private Long value;
public Element(Long value) {
this.value = value;
}
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
@Override
public String toString() {
return "Element{" + "value=" + value + '}';
}
}
}
附录 生产-消费模式
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* 通过定时线程池或阻塞队列的特性去控制
*/
public class SyncTest {
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("xxx-thread-%d").build();
public static void main(String[] args) {
/**
* 定时线程池
*/
// AbortPolicy 终止策略(抛出RejectedExecutionException异常)
// 定时地址池需要注意异常捕获
ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(2, threadFactory, new ThreadPoolExecutor.AbortPolicy());
schedulePool.scheduleAtFixedRate(() -> {
System.out.println("do task");
}, 5, 5, TimeUnit.SECONDS);
// DiscardPolicy 丢弃策略
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3000), threadFactory, new ThreadPoolExecutor.DiscardPolicy());
// execute和submit的区别是什么?
threadPool.execute(() -> System.out.println("runnable"));
threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
/**
* 阻塞队列方式
*/
LinkedBlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<>();
// 生产者
threadPool.execute(() -> {
for (int i = 0; i < 10; i++) {
Object ele = new Object();
System.out.println("生产元素:" + ele);
synchronized (blockingQueue) {
blockingQueue.offer(ele);
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
});
// 消费者
threadPool.execute(() -> {
for (; ; ) {
try {
List<Object> batch = new ArrayList<>();
batch.add(blockingQueue.take());
// 如果拿到一个元素,则等待一定时间,将这段时间内的元素合为一批次处理
TimeUnit.SECONDS.sleep(3);
synchronized (blockingQueue) {
while (!blockingQueue.isEmpty()) {
batch.add(blockingQueue.take());
}
}
System.out.println("消费元素:" + batch);
} catch (InterruptedException e) {
System.err.println("消费异常:" + e.getMessage());
}
}
});
}
}