一 引入
在我们的日常开发中,消息中间件已经成为了java研发工程师的一项必备技能,本文主要是基于对springboot原生组件的扩展开发,基于模板设计模式和静态代理模式,简化了队列路由的绑定,交由公共模板进行统一的绑定,并在公用模板中保证了消息的幂等性和消息的可靠性投递,将这些类似的代码抽离出来,让开发者只专注于业务逻辑的开发.
整体实现思路:
- 开发者申明路由交换机等基础元数据后,交由元数据解析器完成交换机,路由的申明及相应关系的绑定
- 开发者申明消息监听器后,通过消息侦听容器完成队列与监听器的绑定
- 开发者通过消息发射器,发布消息后,系统通过不通模板完成消息的发送
二 逻辑实现
2.1 元数据解析器的构建
申明基础信息接口
/**
* 用于定义交换机队列等核心参数 便于队列交换机等初始化
* @author likun
* @date 2022/6/17 11:22
*/
public interface MessageMetaData {
/**
* 获取队列名称
* @return
*/
String getQueue();
/**
* 交换机类型
* @return
*/
ExchangeTypeEnum getExchangeType();
/**
* 队列配置
* @return
*/
default Map<String,Object> getQueueArgs(){
return null;
};
/**
* 交换机配置
* @return
*/
default Map<String,Object> getExchangeArgs(){
return null;
};
/**
* 消息扩展属性
* @return
*/
default MessageProperties getMessageProperties(){
return null;
};
default void setMessageProperties(MessageProperties messageProperties){
};
}
定义不同的元数据模板
public abstract class FanoutMessageMetaData implements MessageMetaData {
private MessageProperties messageProperties = null;
@Override
public ExchangeTypeEnum getExchangeType() {
return ExchangeTypeEnum.FANOUT;
}
/**
* 交换机名称
* @return
*/
abstract public String getExchange();
@Override
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
@Override
public void setMessageProperties(MessageProperties messageProperties) {
this.messageProperties=messageProperties;
}
}
public abstract class DirectMessageMetadata implements MessageMetaData{
private MessageProperties messageProperties;
@Override
public ExchangeTypeEnum getExchangeType() {
return ExchangeTypeEnum.DIRECT;
}
/**
* 交换机名称
* @return
*/
abstract public String getExchange();
@Override
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
@Override
public void setMessageProperties(MessageProperties messageProperties) {
this.messageProperties=messageProperties;
}
}
申明元数据解析接口,定义解析规范
/**
* 判断是否支持当前交换机类型
* @author likun
* @date 2022/6/17 11:44
*/
public interface Support {
Boolean support(ExchangeTypeEnum exchangeTypeEnum);
}
/**
* 核心参数解析器 解析核心参数并完成队列交换机的绑定
* @author likun
* @date 2022/6/17 11:42
*/
public interface MessageMetaDataResolver extends Support {
/**
* 解析核心参数
* @param messageMetaData
*/
void resolve(MessageMetaData messageMetaData);
}
定义不同模板的解析器
@RequiredArgsConstructor
@Slf4j
public abstract class AbstractMessageMetadataResolver implements MessageMetaDataResolver {
private final RabbitAdmin rabbitAdmin;
@Override
public void resolve(MessageMetaData messageMetaData) {
if (support(messageMetaData.getExchangeType())){
doResolve(messageMetaData);
}
}
/**
* 下游子类实现
* @param messageMetaData
*/
abstract void doResolve(MessageMetaData messageMetaData);
/**
* 申明队列
*/
public void declareQueue(Queue queue){
rabbitAdmin.declareQueue(queue);
log.info("queue [{}] declared.",queue);
}
public void declareExchange(Exchange exchange){
rabbitAdmin.declareExchange(exchange);
log.info("exchange [{}] declared.",exchange);
}
public void declareBinding(Binding binding){
rabbitAdmin.declareBinding(binding);
log.info("binding [{}] declared.",binding);
}
}
public class DirectMessageMetadataResolver extends AbstractMessageMetadataResolver{
public DirectMessageMetadataResolver(RabbitAdmin rabbitAdmin) {
super(rabbitAdmin);
}
@Override
public Boolean support(ExchangeTypeEnum exchangeTypeEnum) {
return ExchangeTypeEnum.DIRECT.equals(exchangeTypeEnum);
}
@Override
void doResolve(MessageMetaData messageMetaData) {
DirectMessageMetadata directMessageMetadata = (DirectMessageMetadata) messageMetaData;
// 申明队列
Queue queue = new Queue(directMessageMetadata.getQueue(), true, false, false, directMessageMetadata.getQueueArgs());
declareQueue(queue);
// 申明交换机
DirectExchange exchange = new DirectExchange(directMessageMetadata.getExchange(), true, false, directMessageMetadata.getExchangeArgs());
declareExchange(exchange);
// 交换机绑定队列
Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName();
declareBinding(binding);
}
}
public class FanoutMessageMetaDateResolver extends AbstractMessageMetadataResolver{
public FanoutMessageMetaDateResolver