guava工具包中的AsyncEventBus原理解析

1.AsyncEventBus主要概念

事件发送方异步发送消息,消费者会从线程池获得一个新线程来异步执行消费逻辑

① Event

事件主体,就是要发的消息内容

②SubscriberRegistry: 

单个事件总线的订阅者注册表:

因为可以有多个事件订阅者,所有的订阅者都注册在这里面

 register方法注册相应的订阅者,主要是用Multimap来存储,消息体class类型做为key,value为订阅封装类,订阅封装类里面有消息总线bus的信息,监听者类,方法 和线程池 ,并且可以有多个

如果监听者类里面有两个相同参数类的方法,那么两个方法都会接收到同样的消息

这是根据消息发送的时候根据消息体来找到对应的订阅者方法

③Dispatcher:  事件分发器

有三个子类:

LegacyAsyncDispatcher:  该事件分发器是AsyncEventBus用的

会将所有的消息和订阅者封装后放在ConcurrentLinkedQueue 队列中

该队列是: 基于链接节点的无界线程安全队列。此队列按照FIFO(先进先出)原则对元素进行排序。队列的头部是队列中存在时间最长的元素,而队列的尾部则是最近添加的元素。新的元素总是被插入到队列的尾部,而队列的获取操作(例如pollpeek)则是从队列头部开始。

通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如阻塞队列常见,毕竟取数据也要不停的去循环,不如阻塞的设计,但是在并发量特别大的情况下,是个不错的选择,性能上好很多,而且这个队列的设计也是特别费力,尤其的使用的改良算法和对哨兵的处理

然后执行入队,出队流程

消息出队后会执行dispatchEvent方法

该方法内就是从事件线程池中,利用反射执行订阅类的监听方法,传入参数就是消息体

PerThreadQueuedDispatcher: 单线程同步的会将所有事件放在ArrayDeque按照顺序分发给订阅者

ImmediateDispatcher: 没有队列,有消息立马发送给监听者

二. 使用教程:

1.创建AsyncEventBus:

构造器传入参数:

指定唯一的bus名称,线程池,指定的异常处理类, async指定的消息分发器LegacyAsyncDispatcher:

2.创建完成AsynEventBus对象后,调用register方法注册相应的订阅者

    @Bean
    public AsyncEventBus policyStatusAsyncEventBus(){
        AsyncEventBus asyncEventBus = new AsyncEventBus("status-asyncEventBus", ThreadPoolHolder.POLICY_STATUS_POOL_EXECUTOR);
        asyncEventBus.register(statusAsyncBusListener);
        return asyncEventBus;
    }

3.创建订阅者对象

@Slf4j
@Component
public class StatusAsyncBusListener {

    @Subscribe
    public void statusListen(StatusMsgDTO statusMsgDTO){
       log.info("收到消息 {}", JSON.toJSONString(statusMsgDTO));
    }
}

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值