基础
direct模式
生產端創建並綁定交換機和隊列,其中重點關注queue的名字,exchange的名字,routingKey的名字
@Bean
public Queue rabbitmqDemoDirectQueue() {
/**
* 1、name: 队列名称
* 2、durable: 是否持久化
* 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
* 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
* */
return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
}
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
@Bean
public Binding bindDirect() {
//链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
//绑定队列
.bind(rabbitmqDemoDirectQueue())
//到交换机
.to(rabbitmqDemoDirectExchange())
//并设置匹配键
.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
}
生產端發送消息,填寫exchange的名字,routingKey的名字和需要發送的msg
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
消費端監聽queue的名字獲取msg
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public void process(String msg, Message message, Channel channel) {}
fanout
相當於廣播
生產端綁定exchange到兩個queue,不需要設置routingKey
@Bean
public Queue fanoutExchangeQueueA() {
//队列A
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false);
}
@Bean
public Queue fanoutExchangeQueueB() {
//队列B
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false);
}
@Bean
public FanoutExchange rabbitmqDemoFanoutExchange() {
//创建FanoutExchange类型交换机
return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true, false);
}
@Bean
public Binding bindFanoutA() {
//队列A绑定到FanoutExchange交换机
return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange());
}
@Bean
public Binding bindFanoutB() {
//队列B绑定到FanoutExchange交换机
return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//启动项目即创建交换机和队列
rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange());
rabbitAdmin.declareQueue(fanoutExchangeQueueB());
rabbitAdmin.declareQueue(fanoutExchangeQueueA());
return null;
}
因爲是廣播,所以生產端發送msg只需要填寫exchange和msg
rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
消費端監聽queue
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A))
public class FanoutExchangeConsumerA {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("队列A收到消息:" + map.toString());
}
}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B))
public class FanoutExchangeConsumerB {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("队列B收到消息:" + map.toString());
}
}
topic
通過設置routingKey的匹配模式,匹配到相應的queue,根據routingKey,可以發給多個queue,也可以發給單個queue
@Bean
public TopicExchange rabbitmqDemoTopicExchange() {
//配置TopicExchange交换机
return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true, false);
}
@Bean
public Queue topicExchangeQueueA() {
//创建队列1
return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true, false, false);
}
@Bean
public Queue topicExchangeQueueB() {
//创建队列2
return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true, false, false);
}
@Bean
public Queue topicExchangeQueueC() {
//创建队列3
return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true, false, false);
}
@Bean
public Binding bindTopicA() {
//队列A绑定到FanoutExchange交换机
return BindingBuilder.bind(topicExchangeQueueB())
.to(rabbitmqDemoTopicExchange())
.with("a.*");
}
@Bean
public Binding bindTopicB() {
//队列A绑定到FanoutExchange交换机
return BindingBuilder.bind(topicExchangeQueueC())
.to(rabbitmqDemoTopicExchange())
.with("a.*");
}
@Bean
public Binding bindTopicC() {
//队列A绑定到FanoutExchange交换机
return BindingBuilder.bind(topicExchangeQueueA())
.to(rabbitmqDemoTopicExchange())
.with("rabbit.#");
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
rabbitAdmin.declareExchange(rabbitmqDemoTopicExchange());
rabbitAdmin.declareQueue(topicExchangeQueueA());
rabbitAdmin.declareQueue(topicExchangeQueueB());
rabbitAdmin.declareQueue(topicExchangeQueueC());
return null;
}
}
生產端填寫exchange,routingKey,msg
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message);
消費端監聽queue
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A))
public class FanoutExchangeConsumerA {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("队列A收到消息:" + map.toString());
}
}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B))
public class FanoutExchangeConsumerB {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("队列B收到消息:" + map.toString());
}
}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_C))
public class FanoutExchangeConsumerC {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("队列C收到消息:" + map.toString());
}
}
這時如果routingKey為rabbit.abcd
則匹配到C
如果routingKey為a.b
則匹配到A和B
header
header模式會根據msg的類型自動路由
生產端綁定
@Bean
public Binding bindHeadersA() {
Map<String, Object> map = new HashMap<>();
map.put("key_one", "java");
map.put("key_two", "rabbit");
//全匹配
return BindingBuilder.bind(headersQueueA())
.to(rabbitmqDemoHeadersExchange())
//全匹配
.whereAll(map).match();
//或者任意匹配
//.whereAny(map).match();
}
生产端发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message);
消息主体{"key_one":"java", "key_two":"rabbit"}
会被消费端匹配
容易产生的问题
1消息堆积
当生产者生产的速度大于消费者的时候就会产生堆积的问题,这个时候如果不是生产者的问题,就需要扩大消费者的规模,或者将新的消息转移到别的队列中
消费者配置线程数量和最大线程数量
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_WORK_AI_TASK = "queue_work_ai_task";
// 声明交换机
// 声明队列
@Bean("AIContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory container= new SimpleRabbitListenerContainerFactory();
container.setConcurrentConsumers(5);
container.setMaxConcurrentConsumers(10);
configurer.configure(factory, connectionFactory);
// 是否重回队列
container.setDefaultRequeueRejected(true);
// 手动确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
2消息丢失
由于宕机或者其他某些原因
1.消息在生产端丢失
2.消息在队列中丢失
3.消息在消费端丢失
1.解决办法:事务和confirm模式
事务很好理解,生产端失败可以回滚。
confirm模式是生产端给消息加上唯一的编号,queue收到数据后返回ack确认。
这样创建confirm模式
ConnectionFactory factory = new ConnectionFactory();
Connection connection = null;
// 创建信道
Channel channel = null;
// 创建连接
connection = factory.newConnection();
// 创建信道
channel = connection.createChannel();
channel.confirmSelect();
或者这样配置
2.解决办法:持久化队列
参数1队列名称,参数2是否持久化,参数3是否独占模式,参数4是否自动删除
Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
3.解决办法:消费端宕机后数据无法恢复,是没有设置手动确认导致的,使用如下方法设置:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//此处也设置为手动ack
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
消费端手动确认代码如下
@RabbitListener(queues = RabbitExChangeConfig.DEAD_LETTER_QUEUE, containerFactory = "simpleRabbitListenerContainerFactory")
public void reciveDeadLetter(Message message, Channel channel, @Headers Map<String, Object> headers) throws IOException {
long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
System.out.println("死信队列消费者收到消息 : " + new String(message.getBody(), "UTF-8"));
/**
* 手动ack
* deliveryTag:该消息的index
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//消息退回 (可以在可视化界面看到)
//批量退回 退回之后重回消息队列 true false的话就是丢弃这条信息,如果配置了死信队列,那这条消息会进入死信队列
channel.basicNack(deliveryTag, false, true);
//单条退回 channel.basicReject();
}
}
如果想使用数据库记录生产者生产失败的记录,消费者消费失败的记录,配置文件按照如下配置
@PostConstruct
public void initRabbitTemplate(){
/**
* @param correlationData 当前消息的唯一关联数据 ,这个是消息的唯一id
* @param ack 消息是否成功收到
* @param cause 失败原因
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack){
//修改数据库
aiTaskService.update(new UpdateWrapper<AiTask>()
.eq("id",correlationData.getId())
.set("cause",cause)
.set("state", AiTask.AI_WORK_FAILURE));
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就会触发这个失败回调
* @param message 投递失败的消息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 消息发送的交换机
* @param routingKey 消息走的路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//修改数据库
String s = new String(message.getBody());
AiMqMessageVo aiMqMessageVo = JSONObject.parseObject(s, AiMqMessageVo.class);
aiTaskService.update(new UpdateWrapper<AiTask>()
.eq("id",aiMqMessageVo.getId())
.set("cause","未成功投递给指定队列。信息 - > 状态码:"+replyCode + " 文本内容:"+replyText + " 交换机:"+exchange + " 路由:"+routingKey)
.set("state", AiTask.AI_WORK_FAILURE));
}
});
}
}
3重复消费
一般情况下消费端消费过后需要发送确认消息给队列,如果由于某些意外原因比如网络故障,队列没有收到消息。就会导致队列将消息重新发送给其他消费者,就会导致重复消费。或者,消费之后还没来得及返回确认消息就重启了等等情况。
既然mq端我们无法解决,那就在消息上下功夫。
解决办法:
1.消息设置唯一的ID作为数据库主键id,这样在insert的时候如果重复消费就会报错。
2.Redis 设置全局唯一id
每次生产者发送消息前设置一个全局唯一id放在消息体重,并存放的 redis 里,在消费端接口上先找在redis 查看是否存在全局id,如果存在,调用消费接口并删除全局id,如果不存在,不做后续操作。
生产端在redis中添加id的值到集合当中,并把id添加到消息中。
private static int id;
jedis.sadd("id", id);
Map<String, String> map = new HashMap<>();
map.add("id", id++);
//其他逻辑
//发送消息
消费端
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("topicDemo1"))
public void process(Map<String, String> msg, Message message, Channel channel) {
long tag = message.getMessageProperties().getDeliveryTag();
Action action = Action.SUCCESS;
try {
//操作redis, 解决重复消费问题
String id = msg.get("id");
Set<Integer> allId;
if(!(allId = jedis.get("id")).contains(id){
System.out.println("重复消费");
return;
}else{
//从集合中删除
allId.srem(id);
}
//其他逻辑
}
}
4死信队列
队列中已经死亡的信息,顾名思义,这些信息已经不会被消费者消费
有几种可能
1消息被拒绝(basic.reject / basic.nack),并且requeue = false
2消息TTL过期
3队列达到最大长度
如何解决呢
1.丢弃,如果不是很重要,可以选择丢弃
2.记录死信入库,然后做后续的业务分析或处理
3.通过死信队列,由负责监听死信的应用程序进行处理
这里采用的是配置死信队列的方式
详见:https://blog.csdn.net/doupengzp/article/details/106929216
rabbitmq实现分布式事务
场景如下,用户在订单中心生成订单修改数据库,同时订单要同步到运单中心。
如果只在订单中心写事务的逻辑,当回滚操作时,运单中心就无法保障了
于是我们使用rabbitmq中间件,对于消息进行异步暂存
-
在订单系统数据库中增加消息状态表
-
生产端开启confirm机制,当消息确实送到queue中返回ack,并修改本地消息状态表。如果发送失败,定期查询数据库消息状态表,重新发送。
-
queue开启持久化,防止宕机消息丢失。
防止重复消费,数据库中使用唯一id进行鉴别。 -
消费者开启手动ack,当消息处理失败的时候,queue可以重新发送。消费端处理成功时再从queue中移除
-
消费者重复消费多次仍然失败,则运维人员处理,或者移交到死信队列,对订单中心进行回滚操作。
其他
負載均衡
CAP理论
一致性
可用性
分区容错性