一、下载安装rabbitMq
(1)首先进去rabbitMQ官方:https://www.rabbitmq.com/
(2)选择GetStarted点击进去如上图所示:
(3)点击下载https://www.rabbitmq.com/download.html
(4)点击进去之后选择你要选择的版本
(5)要安装
Dependencies
RabbitMQ requires a 64-bit supported version of Erlang for Windows to be installed. Erlang releases include a Windows installer. Erlang Solutions provide binary 64-bit builds of Erlang as well.,注意要跟rabbitmq版本一直
(6)、下载erlang:https://www.erlang.org/downloads
然后选择OTP22.1下载及安装
二、安装Linux版本的rabbitMQ
1.目录准备
cd /usr/local/src
mkdir rabbitmq
cd rabbitMq
2.添加仓库地址
为了减少安装的错误我们使用仓库安装 类似于maven
vi /etc/yum.repos.d/rabbitmq-erlang.repo
[rabbitmq-erlang] name=rabbitmq-erlang baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/20/el/7 gpgcheck=1 gpgkey=https://dl.bintray.com/rabbitmq/keys/rabbitmq-release-signing-key.asc repo_gpgcheck=0 enabled=1 |
对应版本可参考:
https://github.com/rabbitmq/erlang-rpm
3.安装erlang
sudo yum install erlang
验证
erl
2.2安装RabbitMQ
网站:http://www.rabbitmq.com/install-rpm.html
RabbitMQ-Server使用是分linux版本的,我们可以使用cat /etc/issue 或者cat /etc/redhat-release命令查看linux版本
下载:https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.2/rabbitmq-server-3.7.2-1.el6.noarch.rpm
上传到rabbitmq
安装:
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.7.2-1.el6.noarch.rpm
2.2.1启动、停止
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
2.2.2设置开机启动
chkconfig rabbitmq-server on
2.2.3设置配置文件
cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config
2.2.4开启用户远程访问
vi /etc/rabbitmq/rabbitmq.config
注意要去掉后面的逗号。
2.2.5 开启web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
2.2.6防火墙开发15672端口
/sbin/iptables -I INPUT -p tcp --dport 15672 j ACCEPT
/etc/rc.d/init.d/iptables save
代码示例:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rabbitmqdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.2</version>
</dependency>
</dependencies>
</project>
1.简单队列:
1.1模型
P:消息的生成者
红色的:队列
C:消费者
3个对象 生成者 队列 消费者
1.2 获取MQ链接
package com.rabbitmq.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @data 获取MQ的链接
* @author bitao
*/
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("122.51.32.202");
//AMQP 5672
factory.setPort(5672);
//vhost
factory.setVirtualHost("/vhost_mmr");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
return factory.newConnection();
}
}
生成者代码示例:
消费者实例代码:
1.5简单队列的不足
耦合性高,生产者一一对应消费者(如果我想有多个消费者消费队列中消息,这时候就不行了),队列名变更这时候得同时变更
2.Work queues 工作队列
2.1 模型
为什么会出现工作队列
Simple队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理可能需要花费时间,这时候队列就会积压了很多消息
2.2 生成者
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @data 生产者
*/
public class Send {
/** |---C1
* p------------Queue----|---C2
* @param args
*/
private static final String QUEUE_NAME = "test_work_simple";
public static void main(String [] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for(int i=0;i<50;i++) {
String msg="hello"+i;
System.out.println("send msg"+msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
消费者1
package com.rabbitmq.work;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @data 消费者1
*/
public class Recv1 {
private static final String QUEUE_NAME = "test_simple_queque";
public static void main(String [] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv msg"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] desc");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
package com.rabbitmq.work;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @data 消费者2
*/
public class Recv2 {
private static final String QUEUE_NAME = "test_simple_queque";
public static void main(String [] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv msg"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] desc");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
现象
消费者1和消费者2处理的消息是一样的
消费者1:偶数
消费者2:奇数
这种方式叫做轮询分类(round-robin) 结果就是不管谁忙活着谁请闲 都不会多给一个消息任务消息总是你一个我一个
3.公平分发
生成者
package com.rabbitmq.workfair;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @data 生产者
* @deprecated 公平分发
*/
public class Send {
/** |---C1
* p------------Queue----|---C2
* @param args
*/
private static final String QUEUE_NAME = "test_work_simple";
public static void main(String [] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 公平分发
* 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
* 限制发送给同一个消费者不得超过一条消息
*/
int prafetchCount = 1;
channel.basicQos(prafetchCount);
for(int i=0;i<50;i++) {
String msg="hello"+i;
System.out.println("send msg"+msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
消费者1
package com.rabbitmq.work;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @data 消费者1
*/
public class Recv1 {
private static final String QUEUE_NAME = "test_simple_queque";
public static void main(String [] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv msg"+msg);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] desc");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
package com.rabbitmq.work;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @data 消费者1
*/
public class Recv2 {
private static final String QUEUE_NAME = "test_simple_queque";
public static void main(String [] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//保证一次只分发一个
channel.basicQos(1);
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv msg"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] desc");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//boolean autoAck = true;
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
现象:消费2处理的消息比消费者1多 能者多劳
3.消息应答与消息持久化
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
boolean autoAck = true;(自动确认模式)一旦rabbitmq将消息分发给消费者,就会从内存中删除
这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息、
boolean autoAck = false;(手动模式),如果一个消费者挂掉,
就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息应答,高数rabbitmq这个消费
我已经处理完成,你可以删除了,然后rabbitmq就会删除内存中的消息
消息应答默认是打开的,false
Message acknowkedgment;
大家想想如果我rabbitmq挂了我们的消息任然会丢失!!!!!!!!!
.消息的持久化
//声明队列
boolean durable = false;
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
我们将程序中的boolean durable = false; 改成true; 是不可以的,尽管代码是正确
的,他也不会运行成功!因为我们已经定义了一个叫test_work_queue这个queue是未持久化,
rabbitmq不准许重新定义(不同参数)一个已存在的队列
订阅模式
.模型
解读:
1.一个生产者,多个消费者
2.每一个消费者都有自己的队列
3.生产者没有直接把消息发送到队列,而是发到了交换机 转发器exchange
4.每个队列都要绑定到交换机上
5.生产者发送的消息 经过交换机 到达队列 就能实现一个消费被多个消费者消费
注册->邮件->短信-
生产者
package com.rabbitmq.ps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* 订阅模式 生成者
*/
public class Send {
private static final String EXCHANGE_NAME="text_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换器
//分支
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//发送消息
String msg="hello ps";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("send:"+msg);
channel.close();
connection.close();
}
}
消费者1
package com.rabbitmq.ps;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @data 消费者获取消费
*/
public class Recv1 {
private static final String QUEUE_NAME = "test_queue_topic_email";
private static final String EXCHANGE_NAME="text_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机转发
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//保证一次只分发一次
channel.basicQos(1);
//定义一个消费者
Consumer consumer=new DefaultConsumer(channel){
//消费到达后触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv msg[1]"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] deone");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume("QUEUE_NAME",autoAck,consumer);
}
}
消费者2
package com.rabbitmq.ps;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @data 消费者获取消费
*/
public class Recv2 {
private static final String QUEUE_NAME = "test_queue_topic_sms";
private static final String EXCHANGE_NAME="text_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机转发
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//保证一次只分发一次
channel.basicQos(1);
//定义一个消费者
Consumer consumer=new DefaultConsumer(channel){
//消费到达后触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv msg[2]"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] deone");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume("QUEUE_NAME",autoAck,consumer);
}
}
6.EXChange(交换机、转换器)
一方面是接收生产者的消息,另一方面是向队列推送消息
匿名转发
Fanout(不处理路由键)
路由模式
模型
生成者
package com.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @desc 生成者 路由模式
*/
public class Send {
private static final String EXCHANGE_NAME="test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//取得链接
Connection connection = ConnectionUtil.getConnection();
//获取一个通道
Channel channel = connection.createChannel();
//交换机
String test_direct="direct";
channel.exchangeDeclare(EXCHANGE_NAME,test_direct);
String msg = "hello direct";
String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("send msg:"+msg);
//关闭链接
channel.close();
connection.close();
}
}
消费者1
package com.rabbitmq.routing;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @desc 消费者消费
*/
public class Recv1 {
private static final String EXCHANGE_NAME="test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
//取得一个连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[1] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//手动模式
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
package com.rabbitmq.routing;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @desc 消费者消费
*/
public class Recv2 {
private static final String EXCHANGE_NAME="test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("QUEUE_NAME",false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[2] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//手动模式
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
Topic exchange
将路由键和某模式匹配
# 匹配一个或者多个
*匹配一个
Topic
模型
商品: 发布 删除 修改 查询 _________
生产者
package com.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @desc 生成者 路由模式
*/
public class Send {
private static final String EXCHANGE_NAME="test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//取得链接
Connection connection = ConnectionUtil.getConnection();
//获取一个通道
Channel channel = connection.createChannel();
//交换机
String test_direct="direct";
channel.exchangeDeclare(EXCHANGE_NAME,topic);
String msg = "hello direct";
String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("send msg:"+msg);
//关闭链接
channel.close();
connection.close();
}
}
消费者1
package com.rabbitmq.routing;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @desc 消费者消费
*/
public class Recv1 {
private static final String EXCHANGE_NAME="test_exchange_topic";
private static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws IOException, TimeoutException {
//取得一个连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"topic.add");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[1] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//手动模式
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
package com.rabbitmq.routing;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bitao
* @desc 消费者消费
*/
public class Recv2 {
private static final String EXCHANGE_NAME="test_exchange_topic";
private static final String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("QUEUE_NAME",false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"topic.#")
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[2] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//手动模式
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
.Rabbitmq的消费确认机制(事务+confirm)
在rabbitmq中我们可以通过持久化数据解决rabbitmq服务器异常的数据丢失问题
问题:生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器默认的情况是不知
道的。
两种方式
AMQP实现了事务机制
Confirm模式
.事务机制
txSelect txCommit txRollback
txSelect:用户当前channel设置成transation模式
txCommit:用于提交事务
txRollback:回滚事务
.生产者
package com.rabbitmq.tx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author
* @desc 事务生产者
*/
public class TxSend {
private static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "hello tx messge";
try {
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("send msg"+msg);
channel.txCommit();
}catch (Exception e) {
channel.txRollback();
System.out.println("send message txRollback");
}
channel.close();
connection.close();
}
}
消费者
package com.rabbitmq.tx;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author
* @desc 事务消费者消费
*/
public class TxRecv {
private static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv[tx] msg:"+ new String(body,"utf-8"));
}
});
}
}
此种模式还是很消耗时的,采用这种方式降低了RabbitMq的消息吞吐量
.Confirm 模式
.生产者端confirm模式的实现原理
生产者将信道设置成confirm 模式,一旦信道进入confirm模式,所有在该信道上面发布
的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,
broker就会发送一个确认给生产者(包含消费的唯一ID),这就使得生产者知道消费已经正确到达目的队列了,如果
消费和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的
确认消息中 deliver-tag 域包含了确认消费的序列号,此外broker也可以设置basic.ack的multiple域,
表示到这个序列号之前的所有消息都已经得到了处理。
Confirm 模式最大的好处在于它是异步
Nack
开启confirm 模式
channel.confirmSelect()
编程模式
1.普通 发一条 waitForConfirms()
2.批量的 发一批 waitForConfirms
3.异步 confirm 模式:提供一个回调方法
Confirm 单条
生产者:
package com.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author
* 普通模式
*/
public class Send1 {
private static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//生产者调用confirSelect 将channel设置为confirm模式注意
channel.confirmSelect();
String msg= "hell confirm message";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if(!channel.waitForConfirms()) {
System.out.println("message send failed");
}else {
System.out.println("message send ok");
}
channel.close();
connection.close();
}
}
消费者
package com.rabbitmq.confirm;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* confirm事务
*/
public class Recv {
private static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv[confirm] msg:" + new String(body,"utf-8"));
}
});
}
}
批量
package com.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
import com.sun.org.apache.xerces.internal.impl.dv.xs.SchemaDVFactoryImpl;
import javax.imageio.stream.ImageInputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author
* @desc 批量
*/
public class Send2 {
private static final String QUEUE_NAME ="test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
String msg = "hello config message";
//批量发送
for(int i = 0; i<10;i++) {
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
//确认
if(!channel.waitForConfirms()) {
System.out.println("message send failed");
}else {
System.out.println("message send ok");
}
}
}
.异步模式
Channel 对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Channel发出
的消息序列号),我们需要自己为每一个Channel维护一个unconfirm的消息序列号集合,
每publish 一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉
相应的一条(multiple = false) 或多条(mulitple=true)记录。从程序运行效率上看,这个
unconfirm集合最好采用有序集合SortedSet存储结构。
消息机制生产者
package com.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
/**
* @desc 消息确认机制之confirm异步
*/
public class Send3 {
private static final String QUEUE_NAME = "test_queue_confirm3";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//生产者调用confirmSelect 将channel设置为confirm模式注意
channel.confirmSelect();
//未确认的标识
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//通道添加监听
channel.addConfirmListener(new ConfirmListener() {
//bandleNack
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if(multiple) {
System.out.println("------handleNack-----------multiple------");
confirmSet.headSet(deliveryTag+1).clear();
}else {
System.out.println("------handleNack-----------multiple false------");
confirmSet.remove(deliveryTag);
}
}
//没有问题的handleNack
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if(multiple) {
System.out.println("------handleNack-----------------multiple--------");
confirmSet.headSet(deliveryTag+1).clear();
}else {
System.out.println("-------handleNack-----------------multiple false--------------");
confirmSet.remove(deliveryTag);
}
}
});
String msgSet= "sssssss";
while (true) {
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msgSet.getBytes());
confirmSet.add(seqNo);
}
}
}
消息确认机制之confirm异步消费者消费
package com.rabbitmq.confirm;
import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* confirm事务
*/
public class Recv {
private static final String QUEUE_NAME = "test_queue_confirm3";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv[confirm] msg:" + new String(body,"utf-8"));
}
});
}
}
Spring集成RabbitMq
生产者
消费者
SpringApplication.xml配置文件
实例代码: