rabbitmq学习

基础

direct模式

请添加图片描述
生產端創建並綁定交換機和隊列,其中重點關注queue的名字,exchange的名字,routingKey的名字

    @Bean
    public Queue rabbitmqDemoDirectQueue() {
        /**
         * 1、name:    队列名称
         * 2、durable: 是否持久化
         * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
         * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
         * */
        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
    }
    
    @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }
    @Bean
    public Binding bindDirect() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoDirectQueue())
                //到交换机
                .to(rabbitmqDemoDirectExchange())
                //并设置匹配键
                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }

生產端發送消息,填寫exchange的名字,routingKey的名字和需要發送的msg

rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);

消費端監聽queue的名字獲取msg

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
    public void process(String msg, Message message, Channel channel) {}

fanout

相當於廣播
请添加图片描述
生產端綁定exchange到兩個queue,不需要設置routingKey

    @Bean
    public Queue fanoutExchangeQueueA() {
        //队列A
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false);
    }

    @Bean
    public Queue fanoutExchangeQueueB() {
        //队列B
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false);
    }

    @Bean
    public FanoutExchange rabbitmqDemoFanoutExchange() {
        //创建FanoutExchange类型交换机
        return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true, false);
    }

    @Bean
    public Binding bindFanoutA() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange());
    }

    @Bean
    public Binding bindFanoutB() {
        //队列B绑定到FanoutExchange交换机
        return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
    }
    
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        //启动项目即创建交换机和队列
        rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange());
        rabbitAdmin.declareQueue(fanoutExchangeQueueB());
        rabbitAdmin.declareQueue(fanoutExchangeQueueA());
        return null;
    }

因爲是廣播,所以生產端發送msg只需要填寫exchange和msg

rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);

消費端監聽queue

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A))
public class FanoutExchangeConsumerA {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列A收到消息:" + map.toString());
    }
}

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B))
public class FanoutExchangeConsumerB {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列B收到消息:" + map.toString());
    }
}

topic

通過設置routingKey的匹配模式,匹配到相應的queue,根據routingKey,可以發給多個queue,也可以發給單個queue
请添加图片描述

@Bean
    public TopicExchange rabbitmqDemoTopicExchange() {
        //配置TopicExchange交换机
        return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true, false);
    }

    @Bean
    public Queue topicExchangeQueueA() {
        //创建队列1
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true, false, false);
    }

    @Bean
    public Queue topicExchangeQueueB() {
        //创建队列2
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true, false, false);
    }

    @Bean
    public Queue topicExchangeQueueC() {
        //创建队列3
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true, false, false);
    }

    @Bean
    public Binding bindTopicA() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(topicExchangeQueueB())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*");
    }

    @Bean
    public Binding bindTopicB() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(topicExchangeQueueC())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*");
    }

    @Bean
    public Binding bindTopicC() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(topicExchangeQueueA())
                .to(rabbitmqDemoTopicExchange())
                .with("rabbit.#");
    }
    
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        rabbitAdmin.declareExchange(rabbitmqDemoTopicExchange());
        rabbitAdmin.declareQueue(topicExchangeQueueA());
        rabbitAdmin.declareQueue(topicExchangeQueueB());
        rabbitAdmin.declareQueue(topicExchangeQueueC());
        return null;
    }
}

生產端填寫exchange,routingKey,msg

rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message);

消費端監聽queue

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A))
public class FanoutExchangeConsumerA {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列A收到消息:" + map.toString());
    }
}

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B))
public class FanoutExchangeConsumerB {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列B收到消息:" + map.toString());
    }
}

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_C))
public class FanoutExchangeConsumerC {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列C收到消息:" + map.toString());
    }
}

這時如果routingKey為rabbit.abcd則匹配到C
如果routingKey為a.b則匹配到A和B

header

header模式會根據msg的類型自動路由
生產端綁定

    @Bean
    public Binding bindHeadersA() {
        Map<String, Object> map = new HashMap<>();
        map.put("key_one", "java");
        map.put("key_two", "rabbit");
        //全匹配
        return BindingBuilder.bind(headersQueueA())
                .to(rabbitmqDemoHeadersExchange())
                //全匹配
                .whereAll(map).match();
				//或者任意匹配
				//.whereAny(map).match();
    }

生产端发送消息

rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message);
消息主体{"key_one":"java", "key_two":"rabbit"}

会被消费端匹配

容易产生的问题

1消息堆积

当生产者生产的速度大于消费者的时候就会产生堆积的问题,这个时候如果不是生产者的问题,就需要扩大消费者的规模,或者将新的消息转移到别的队列中

消费者配置线程数量和最大线程数量

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_WORK_AI_TASK = "queue_work_ai_task";
    // 声明交换机
    // 声明队列
	@Bean("AIContainerFactory")
	public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, 
		ConnectionFactory connectionFactory) {
   		SimpleRabbitListenerContainerFactory container= new SimpleRabbitListenerContainerFactory();
   		container.setConcurrentConsumers(5);
   		container.setMaxConcurrentConsumers(10);
  		configurer.configure(factory, connectionFactory);
  		// 是否重回队列
        container.setDefaultRequeueRejected(true);
        // 手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
   		return factory;
 	}
 	
}

2消息丢失

由于宕机或者其他某些原因

1.消息在生产端丢失
2.消息在队列中丢失
3.消息在消费端丢失

1.解决办法:事务和confirm模式
事务很好理解,生产端失败可以回滚。
confirm模式是生产端给消息加上唯一的编号,queue收到数据后返回ack确认。
这样创建confirm模式

ConnectionFactory factory = new ConnectionFactory();
Connection connection = null;
// 创建信道
Channel channel = null;
// 创建连接
connection = factory.newConnection();
// 创建信道
channel = connection.createChannel();
channel.confirmSelect();

或者这样配置
在这里插入图片描述

2.解决办法:持久化队列
参数1队列名称,参数2是否持久化,参数3是否独占模式,参数4是否自动删除

Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);

3.解决办法:消费端宕机后数据无法恢复,是没有设置手动确认导致的,使用如下方法设置:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //此处也设置为手动ack
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
    
	public enum AcknowledgeMode {
    	NONE,
    	MANUAL,
    	AUTO;
	}

消费端手动确认代码如下

 @RabbitListener(queues = RabbitExChangeConfig.DEAD_LETTER_QUEUE, containerFactory = "simpleRabbitListenerContainerFactory")
    public void reciveDeadLetter(Message message, Channel channel, @Headers Map<String, Object> headers) throws IOException {
        long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
 
            System.out.println("死信队列消费者收到消息 : " + new String(message.getBody(), "UTF-8"));
            /**
             * 手动ack
             * deliveryTag:该消息的index
             * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
             */
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            //消息退回 (可以在可视化界面看到)
            //批量退回 退回之后重回消息队列 true  false的话就是丢弃这条信息,如果配置了死信队列,那这条消息会进入死信队列
            channel.basicNack(deliveryTag, false, true);
            //单条退回 channel.basicReject();
        }
    }

如果想使用数据库记录生产者生产失败的记录,消费者消费失败的记录,配置文件按照如下配置

	@PostConstruct
    public void initRabbitTemplate(){
        /**
         * @param correlationData  当前消息的唯一关联数据  ,这个是消息的唯一id
         * @param ack 消息是否成功收到
         * @param cause 失败原因
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (!ack){
                    //修改数据库
                    aiTaskService.update(new UpdateWrapper<AiTask>()
                            .eq("id",correlationData.getId())
                            .set("cause",cause)
                            .set("state", AiTask.AI_WORK_FAILURE));
                }
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有投递给指定的队列,就会触发这个失败回调
             * @param message   投递失败的消息
             * @param replyCode  回复的状态码
             * @param replyText  回复的文本内容
             * @param exchange  消息发送的交换机
             * @param routingKey  消息走的路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //修改数据库
                String s = new String(message.getBody());
                AiMqMessageVo aiMqMessageVo = JSONObject.parseObject(s, AiMqMessageVo.class);
                aiTaskService.update(new UpdateWrapper<AiTask>()
                        .eq("id",aiMqMessageVo.getId())
                        .set("cause","未成功投递给指定队列。信息 - > 状态码:"+replyCode + " 文本内容:"+replyText  + "  交换机:"+exchange  + "  路由:"+routingKey)
                        .set("state", AiTask.AI_WORK_FAILURE));
            }
        });
    }
 }

3重复消费

一般情况下消费端消费过后需要发送确认消息给队列,如果由于某些意外原因比如网络故障,队列没有收到消息。就会导致队列将消息重新发送给其他消费者,就会导致重复消费。或者,消费之后还没来得及返回确认消息就重启了等等情况。

既然mq端我们无法解决,那就在消息上下功夫。
解决办法:

1.消息设置唯一的ID作为数据库主键id,这样在insert的时候如果重复消费就会报错。

2.Redis 设置全局唯一id
每次生产者发送消息前设置一个全局唯一id放在消息体重,并存放的 redis 里,在消费端接口上先找在redis 查看是否存在全局id,如果存在,调用消费接口并删除全局id,如果不存在,不做后续操作。

生产端在redis中添加id的值到集合当中,并把id添加到消息中。

private static int id;
jedis.sadd("id", id);
Map<String, String> map = new HashMap<>();
map.add("id", id++);
//其他逻辑
//发送消息

消费端

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("topicDemo1"))
    public void process(Map<String, String> msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            //操作redis, 解决重复消费问题
            String id = msg.get("id");
            Set<Integer> allId;
            if(!(allId = jedis.get("id")).contains(id){
                System.out.println("重复消费");
                return;
            }else{
            	//从集合中删除
            	allId.srem(id);
            }
            //其他逻辑
        }
    }

4死信队列

队列中已经死亡的信息,顾名思义,这些信息已经不会被消费者消费
有几种可能
1消息被拒绝(basic.reject / basic.nack),并且requeue = false
2消息TTL过期
3队列达到最大长度

如何解决呢
1.丢弃,如果不是很重要,可以选择丢弃
2.记录死信入库,然后做后续的业务分析或处理
3.通过死信队列,由负责监听死信的应用程序进行处理

这里采用的是配置死信队列的方式
详见:https://blog.csdn.net/doupengzp/article/details/106929216

rabbitmq实现分布式事务

场景如下,用户在订单中心生成订单修改数据库,同时订单要同步到运单中心。
在这里插入图片描述
如果只在订单中心写事务的逻辑,当回滚操作时,运单中心就无法保障了
在这里插入图片描述
于是我们使用rabbitmq中间件,对于消息进行异步暂存
在这里插入图片描述

  1. 在订单系统数据库中增加消息状态表
    在这里插入图片描述

  2. 生产端开启confirm机制,当消息确实送到queue中返回ack,并修改本地消息状态表。如果发送失败,定期查询数据库消息状态表,重新发送。
    在这里插入图片描述

  3. queue开启持久化,防止宕机消息丢失。
    防止重复消费,数据库中使用唯一id进行鉴别。

  4. 消费者开启手动ack,当消息处理失败的时候,queue可以重新发送。消费端处理成功时再从queue中移除

  5. 消费者重复消费多次仍然失败,则运维人员处理,或者移交到死信队列,对订单中心进行回滚操作。

其他

負載均衡

CAP理论

一致性
可用性
分区容错性

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值