在上文中,我们创建了一个简单的日志系统,它可以将消息广播给多个Consumer。在本文中,我们将为其添加一个功能。Consumer将只能订阅一部分消息。
绑定
绑定是exchange和queue之间的关系。在前面的例子中,我们已经创建了绑定,queueName为队列的名字,“logs”为Exchange的名字。
channel.queueBind(queueName,"logs","");
绑定可以采用额外的routingKey参数。
channel.queueBind(queueName,"logs",routingKey);
对于fanout的exchange来说,这个参数是被忽略的。
direct exchange
在RabbitMQ札记-RabbitMQ入门一文中,我们曾学习过RabbitMQ的概念模型,其中就介绍过Exchange分发策略中的direct。direct意为如果Routing key匹配, 那么Message就会被传递到相应的Queue中。比如Routing key为key的Exchange可以分发消息到Routing key为key的Queue。
从图中我们可以看到两个队列绑定的直接交换。第一个队列用绑定键key1绑定,第二个队列有两个绑定,一个绑定键为key2,另一个为key3。通过路由键key1发布到Exchange的消息将被路由到队列Queue1。通过路由键key2或key3发布到Exchange的消息将被路由到队列Queue2。其他消息将被丢弃。
多个绑定
绑定多个队列是完全合法的。
在这种情况下,直接交换就像广播一样,将消息广播到所有的匹配队列。
最终版本
Producer.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String EXCHANGE_NAME = "direct_logs";
private final static String ROUTING_KEY = "key1";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建一个到服务器的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("yst");
factory.setPassword("yst");
factory.setHost("192.168.17.64");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//发布消息到我们的exchange,而不是队列
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 发送的消息
String message = "Hello World.";
// 发消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println("Sent:" + ROUTING_KEY + ":" + message);
// 关闭渠道和连接;
channel.close();
conn.close();
}
}
Consumer.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private final static String EXCHANGE_NAME = "direct_logs";
private final static String ROUTING_KEY = "key1";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建一个到服务器的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("yst");
factory.setPassword("yst");
factory.setHost("192.168.17.64");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//获取channel绑定的队列的名字
String queueName = channel.queueDeclare().getQueue();
//将queue与exchange绑定
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("Waiting for messages.");
// 创建队列消费者
final DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received:" + envelope.getRoutingKey() + "':'" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
Consumer2.java
将Consumer.java复制一份,命名为Consumer2.java。将其中的ROUTING_KEY改为“key2”。
运行两个Consumer实例,称其为C1和C2,运行一个Consumer2实例,称其为C3。运行Producer发送消息。观察运行结果
C1:
Waiting for messages.
Received:key1':'Hello World.
C2:
Waiting for messages.
Received:key1':'Hello World.
C3:
Waiting for messages.
结果验证了上述直接交换和多个绑定中的内容。