1.AsyncEventBus主要概念
事件发送方异步发送消息,消费者会从线程池获得一个新线程来异步执行消费逻辑
① Event
事件主体,就是要发的消息内容
②SubscriberRegistry:
单个事件总线的订阅者注册表:
因为可以有多个事件订阅者,所有的订阅者都注册在这里面
register方法注册相应的订阅者,主要是用Multimap来存储,消息体class类型做为key,value为订阅封装类,订阅封装类里面有消息总线bus的信息,监听者类,方法 和线程池 ,并且可以有多个
如果监听者类里面有两个相同参数类的方法,那么两个方法都会接收到同样的消息
这是根据消息发送的时候根据消息体来找到对应的订阅者方法
③Dispatcher: 事件分发器
有三个子类:
LegacyAsyncDispatcher: 该事件分发器是AsyncEventBus用的
会将所有的消息和订阅者封装后放在ConcurrentLinkedQueue 队列中
该队列是: 基于链接节点的无界线程安全队列。此队列按照FIFO(先进先出)原则对元素进行排序。队列的头部是队列中存在时间最长的元素,而队列的尾部则是最近添加的元素。新的元素总是被插入到队列的尾部,而队列的获取操作(例如poll
或peek
)则是从队列头部开始。
通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如阻塞队列常见,毕竟取数据也要不停的去循环,不如阻塞的设计,但是在并发量特别大的情况下,是个不错的选择,性能上好很多,而且这个队列的设计也是特别费力,尤其的使用的改良算法和对哨兵的处理
然后执行入队,出队流程
消息出队后会执行dispatchEvent方法
该方法内就是从事件线程池中,利用反射执行订阅类的监听方法,传入参数就是消息体
PerThreadQueuedDispatcher: 单线程同步的会将所有事件放在ArrayDeque按照顺序分发给订阅者
ImmediateDispatcher: 没有队列,有消息立马发送给监听者
二. 使用教程:
1.创建AsyncEventBus:
构造器传入参数:
指定唯一的bus名称,线程池,指定的异常处理类, async指定的消息分发器LegacyAsyncDispatcher:
2.创建完成AsynEventBus对象后,调用register方法注册相应的订阅者
@Bean
public AsyncEventBus policyStatusAsyncEventBus(){
AsyncEventBus asyncEventBus = new AsyncEventBus("status-asyncEventBus", ThreadPoolHolder.POLICY_STATUS_POOL_EXECUTOR);
asyncEventBus.register(statusAsyncBusListener);
return asyncEventBus;
}
3.创建订阅者对象
@Slf4j
@Component
public class StatusAsyncBusListener {
@Subscribe
public void statusListen(StatusMsgDTO statusMsgDTO){
log.info("收到消息 {}", JSON.toJSONString(statusMsgDTO));
}
}