说明:
(1)本篇博客主要介绍RabbitMQ交换机的内容,包括fanout模式,direct模式,topic模式,headers模式等;
目录
一:RabbitMQ的交换机,在前面粗浅的介绍其概念,但我们没有详细介绍:
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】;
2.创建ReceiveLog类,这个类可以认为是一个存储日志的类;其角色是【Consumer消费者】;
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】;
2.创建ReceiveLog1类,这个类可以认为是【把日志打印在控制台】那个接收日志信息的类;其角色是【Consumer消费者】;
3.创建ReceiveLog2类,这个类可以认为是【把日志写在日志文件中】那个接收日志信息的类;其角色是【Consumer消费者】;
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】;
2.创建Receive1类,其角色是【Consumer消费者】;然后,其接收消息时,设置的Routing Key是【*.orange.*】;
3.创建Receive2类,其角色是【Consumer消费者】;然后,其接收消息时,设置的Routing Key是【*.*.rabbit】和【lazy.#】;
一:RabbitMQ的交换机,在前面粗浅的介绍其概念,但我们没有详细介绍:
(1)在【RabbitMQ入门1:RabbitMQ简介;(消息队列简介;RabbitMQ简介;)】中,简单介绍了交换机的概念;
● 交换机的主要作用是,Producer在发送消息时候,是通过交换机向对应的队列中发送消息的;
● 然后,可以使用RoutingKey来指定,交换机和哪些队列绑定;即,Routing Key路由键就是专用于决定交换机和队列是如何进行绑定的;
简单来说,消息发送方即Producer生产者,在发布消息时候,可以设置交换机和路由键,以把消息发送到指定的队列中去;即,交换机的主要作用是,Producer生产者发布消息用的;
(2)在【RabbitMQ入门4:生产者、消费者演示;多个消费者平均压力、公平派遣;】中,演示通过客户端连接使用RabbitMQ的时候,也涉及到了交换机的一点内容;
●在这儿,我们发布消息的时候,没有指定交换机(即使用的是RabbitMQ的默认交换机);如果我们没有指定交换机,那么我们就需要设置RoutingKey,以让RabbitMQ知道,Producer发送的消息要发送到那个(些)队列上去;
二:RabbitMQ中,交换机的四种工作模式:简述;
(1)fanout:广播模式的交换机;
● 即,如果Producer生产者在发布消息的时候,采用的是fanout模式的交换机,那么RabbitMQ会向【所有与这个交换机绑定的队列】都发送这个(同样的)消息;
● 所以,如果Producer生产者在发布消息的时候,采用的是fanout模式的交换机;其发布消息的时候,就不用设置RoutingKey了;因为,其会向【所有与这个交换机绑定的队列】都发送这个(同样的)消息,就不需要RoutingKey来指定向哪些队列发送消息了;
(2)direct:直接模式;
● 其会根据,RoutingKey路由键,去决定交换机把消息发送到哪个(些)队列上去;
● Producer在发送消息时,如果使用direct模式;那么,我们在发送消息的时候,就需要指定一个Routing Key(否则交换机,将不知道把消息发送到哪个(些)队列上去);
● 然后Consumer在接收消息的时候,需要创建一个队列;然后,Consumer要想使用队列接收到某个消息,那么在通过队列接受消息时,就需要设置对应的Routing Key;
(3)topic模式;
● Topic模式可以认为是direct模式的一个威力加强版;
● Topic之所以,可以应对“多个条件”的复杂逻辑;主要是其可以进行模糊匹配;
● 其中,*可以匹配任何一个单词(不包含"."的任意字符串);例如,item.*可以匹配item.pikapika,或者item.&gdygj。diu,或者item.ji。。joidjfi%djl但不能匹配item.ji。。joidjf.i%djl(实测如此;;;;;;貌似在RabbitMQ的Topic模式中,"."是一种特殊的存在,其可以作为分隔符,每隔一个"."就表示多了一个单词);
#可以匹配0至多个单词;例如:item.# 能够匹配 item.insert.abc 或者 item.insert;(实测如此)
● 关于,topic模式,可以看下下面的案例;
三:交换机类型之:fanout广播模式;
0.问题引入;
(1)一个适合使用fanout模式交换机的,场景;
试想一下这种场景:我们要构建一个日志记录系统,其情况和需求如下:
● 其中,比如我们的【业务代码】会发出日志消息,然后我们的【日志记录代码】负责接收并存储日志消息;
● 然后,同时既希望把日志存到磁盘上,同时也希望把日志打印到屏幕上;
● 那么,对于这种需求,我们就需要有多个接收日志的地方;即,也就是,我们有一个发送日志消息的,但却有多个接收日志消息的(对于同样的一条消息,这个多个接收日志消息的地方,都需收到);
那么,对于这种需求,那么Producer在发布消息的时候,使用fanout模式的交换机,就很适合;
● 如果Producer生产者在发布消息的时候,使用fanout模式交换机,会有如下特点:如果Producer生产者,在发布消息的时候,采用fanout模式的交换机;那么,在发布消息时,那么所有与这个交换机绑定的队列(一般这个队列声明在Consumer消费者处),都会收到这个消息;
● 即,Producer生产者在发布消息的时候,如果采用fanout模式的交换机;那么,其发布消息的时候,会向RabbitMQ中、所有、与这个交换机绑定的,队列中,都发送这个消息;
● 如果,RabbitMQ中、当前、没有与这个交换机绑定的队列,那么消息就相当于是一个无的之矢,这个消息就会在设定的存活时间内容,静静的等待;一旦有【与这个交换机绑定的队列】被创建后,这个消息就会把发送到那个队列中去;(PS:这一点,可能存在不准确、甚至错误的地方;但是,这也是自己经过实测的……)
……………………………………………………
(2)关于日志存储,还有第二个需求;
● 需求:由于日志存储这个业务的特殊性,我们只想记录当前产生的日志,即【日志记录代码】只需要记录【日志记录代码启动后,业务代码产生的日志】;(因为,以前产生的日志,自有当时的日志记录代码负责;;;;同时,日志也有时效性,记录一些旧日志的意义不是很大;;;);
● 那么,针对这个需求,Consumer在接收消息的时候,就可以使用临时队列;(有关临时队列的内容,在下面会介绍)
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】;
EmitLog类:
package fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述:发送日志信息; */ public class EmitLog { //定义交换机的名字; private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址) connectionFactory.setHost("1**.***.***.**8"); //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行; // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】 connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); //PS:记得要放开RabbitMQ部署的服务器的,5672端口; //3.建立连接 Connection connection = connectionFactory.newConnection(); //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了) Channel channel = connection.createChannel(); //定义交换机;第一个参数,交换机的名字;第二个参数,交换机的类型; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //模拟一下,日志的内容 String message = "info:hello world"; //6.发布消息 //参数说明:第一个参数(exchange)是交换机; // 第二个参数(routingKey)是路由键,这儿因为交换机是fanout模式,所以路由键不需要写; //第三个参数(props),消息除了消息体外,还要有props作为它的配置; // 第四个参数(body)消息的内容,要求是byte[]类型的,同时,需要指定编码类型 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("发送了消息:" + message); channel.close(); connection.close(); } }
说明:
(1)类内容说明;
(2)关于fanout交换机,和,RabbitMQ默认交换机,在使用时的区别;
● 在【RabbitMQ入门4:生产者、消费者演示;多个消费者平均压力、公平派遣;】中,我们一开始演示RabbitMQ的时候,并没有主动去关心交换机的内容,当时使用的是默认交换机;;;所以,在当时我们发送消息的时候,为了能让RabbitMQ知道要被这个消息发送到哪个(些)队列中,我们需要指定Routing Key;;;然后,当时,我们在Producer生产者中就定义了一个队列,然后把这个队列作为RoutingKey,就相当于告诉RabbitMQ,我们发送的这个消息就是发送到这个队列中的;;;自然,RabbitMQ通过默认交换机,把消息发送到那个队列中后,这个消息就在队列中了;;;如果,当前没有任何Consumer消费者连接这个队列,那么这个队列中的消息,就会默默的等待;;;如果,有Consumer消费者连接了这个队列,那么Consumer就可以从这个队列中获取消息;;;而且,使用默认交换机的时候,RabbitMQ会采用分发的方式,把队列中的消息,分发给所有(连接了这个队列的)的Consumer,每个消息只会发送给一个Consumer(比如,有1,2,3,4四条消息;然后,有两个队列;那么第一个队列会分到1,3两条消息,第二个队列会分到2,4两条消息);;;这在 【RabbitMQ入门4:生产者、消费者演示;多个消费者平均压力、公平派遣;】有着很明显的体现;