非路由消息
生产者发布消息时,在默认情况下,并不知道消息是否发布成功,是否正确路由到相应的队列。本文的目的在于如何处理路由失败的消息。
如何处理
1. 设置mandatroy=true;默认情况是false,设置为true的目的是告诉rabbitmq当交换器路由不到合适的队列时,不是将该消息丢失,而且将这个消息返回到生产者(basic.return);
2. 添加ReturnListener监听器;就是一个回调监听器,就是处理失败路由的消息;
案例
代码
package com.rabbitmq.noroutable;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by wangzhiping on 17/7/4.
*/
public class ProducerTest {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(6002);
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare("ex-noroutable", BuiltinExchangeType.TOPIC, false);
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("replyCode: " + replyCode + ", replyText: " + replyText + ", routingKey: " + routingKey
+ ", properties: " + properties + ", body: " + new String(body, "UTF-8"));
}
});
channel.basicPublish("ex-noroutable", "", true,false, null, "noroutable".getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
QA
这里需要特别注意的是addReturnListener是一个非阻塞回调接口,如果提前使用了channel.close(),会关闭管道导致无法处理回调消息。