RabbitMQ札记-路由

在上文中,我们创建了一个简单的日志系统,它可以将消息广播给多个Consumer。在本文中,我们将为其添加一个功能。Consumer将只能订阅一部分消息。

绑定

绑定是exchange和queue之间的关系。在前面的例子中,我们已经创建了绑定,queueName为队列的名字,“logs”为Exchange的名字。

channel.queueBind(queueName,"logs","");

绑定可以采用额外的routingKey参数。

channel.queueBind(queueName,"logs",routingKey);

对于fanout的exchange来说,这个参数是被忽略的。

direct exchange

在RabbitMQ札记-RabbitMQ入门一文中,我们曾学习过RabbitMQ的概念模型,其中就介绍过Exchange分发策略中的direct。direct意为如果Routing key匹配, 那么Message就会被传递到相应的Queue中。比如Routing key为key的Exchange可以分发消息到Routing key为key的Queue。
MarkdownPhotos/master/CSDNBlogs/RabbitMQ/directExchangeModel.png
从图中我们可以看到两个队列绑定的直接交换。第一个队列用绑定键key1绑定,第二个队列有两个绑定,一个绑定键为key2,另一个为key3。通过路由键key1发布到Exchange的消息将被路由到队列Queue1。通过路由键key2或key3发布到Exchange的消息将被路由到队列Queue2。其他消息将被丢弃。

多个绑定

绑定多个队列是完全合法的。
MarkdownPhotos/master/CSDNBlogs/RabbitMQ/multiBindExchangeModel.png
在这种情况下,直接交换就像广播一样,将消息广播到所有的匹配队列。

最终版本

Producer.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

	private final static String EXCHANGE_NAME = "direct_logs";
	private final static String ROUTING_KEY = "key1";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 创建一个到服务器的连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setUsername("yst");
		factory.setPassword("yst");
		factory.setHost("192.168.17.64");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();

		//发布消息到我们的exchange,而不是队列
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

		// 发送的消息
		String message = "Hello World.";
		// 发消息
		channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
		System.out.println("Sent:" + ROUTING_KEY + ":" + message);

		// 关闭渠道和连接;
		channel.close();
		conn.close();
	}
}

Consumer.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

	private final static String EXCHANGE_NAME = "direct_logs";
	private final static String ROUTING_KEY = "key1";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 创建一个到服务器的连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setUsername("yst");
		factory.setPassword("yst");
		factory.setHost("192.168.17.64");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
		//获取channel绑定的队列的名字
		String queueName = channel.queueDeclare().getQueue();
		//将queue与exchange绑定
		channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);

		System.out.println("Waiting for messages.");

		// 创建队列消费者
		final DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("Received:" + envelope.getRoutingKey() + "':'" + message);
			}
		};
		channel.basicConsume(queueName, true, consumer);
	}
}

Consumer2.java
将Consumer.java复制一份,命名为Consumer2.java。将其中的ROUTING_KEY改为“key2”。

运行两个Consumer实例,称其为C1和C2,运行一个Consumer2实例,称其为C3。运行Producer发送消息。观察运行结果
C1:

Waiting for messages.
Received:key1':'Hello World.

C2:

Waiting for messages.
Received:key1':'Hello World.

C3:

Waiting for messages.

结果验证了上述直接交换和多个绑定中的内容。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值