RabbitMQ Management学习笔记

一、下载安装rabbitMq

(1)首先进去rabbitMQ官方:https://www.rabbitmq.com/

(2)选择GetStarted点击进去如上图所示:

(3)点击下载https://www.rabbitmq.com/download.html

(4)点击进去之后选择你要选择的版本

(5)要安装

Dependencies

RabbitMQ requires a 64-bit supported version of Erlang for Windows to be installed. Erlang releases include a Windows installerErlang Solutions provide binary 64-bit builds of Erlang as well.,注意要跟rabbitmq版本一直

(6)、下载erlang:https://www.erlang.org/downloads 

然后选择OTP22.1下载及安装 

二、安装Linux版本的rabbitMQ

1.目录准备

cd /usr/local/src

mkdir rabbitmq

cd rabbitMq

2.添加仓库地址

为了减少安装的错误我们使用仓库安装 类似于maven

vi /etc/yum.repos.d/rabbitmq-erlang.repo

[rabbitmq-erlang]

name=rabbitmq-erlang

baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/20/el/7

gpgcheck=1

gpgkey=https://dl.bintray.com/rabbitmq/keys/rabbitmq-release-signing-key.asc

repo_gpgcheck=0

enabled=1

 

 

 

 

 

 对应版本可参考:

https://github.com/rabbitmq/erlang-rpm

3.安装erlang

sudo yum install erlang

验证

erl

2.2安装RabbitMQ

网站:http://www.rabbitmq.com/install-rpm.html

RabbitMQ-Server使用是分linux版本的,我们可以使用cat /etc/issue 或者cat /etc/redhat-release命令查看linux版本

下载:https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.2/rabbitmq-server-3.7.2-1.el6.noarch.rpm

上传到rabbitmq

安装:

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

yum install rabbitmq-server-3.7.2-1.el6.noarch.rpm

2.2.1启动、停止

service rabbitmq-server start

service rabbitmq-server stop 

service rabbitmq-server restart

2.2.2设置开机启动

chkconfig rabbitmq-server on

2.2.3设置配置文件

cd /etc/rabbitmq

cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/

mv rabbitmq.config.example rabbitmq.config

2.2.4开启用户远程访问

vi /etc/rabbitmq/rabbitmq.config

注意要去掉后面的逗号。

2.2.5 开启web界面管理工具

rabbitmq-plugins enable rabbitmq_management

service rabbitmq-server restart

2.2.6防火墙开发15672端口

/sbin/iptables -I INPUT -p tcp --dport 15672 j ACCEPT

/etc/rc.d/init.d/iptables save

代码示例:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rabbitmqdemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.2</version>
        </dependency>
    </dependencies>
</project>

1.简单队列:

       1.1模型

 

                 

 

 P:消息的生成者

红色的:队列

C:消费者

3个对象  生成者  队列   消费者

1.2 获取MQ链接

package com.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @data 获取MQ的链接
 * @author  bitao
 */
public class ConnectionUtil {

    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("122.51.32.202");
        //AMQP 5672
        factory.setPort(5672);
        //vhost
        factory.setVirtualHost("/vhost_mmr");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");
        return  factory.newConnection();
    }
}
生成者代码示例:

消费者实例代码: 

 

1.5简单队列的不足

耦合性高,生产者一一对应消费者(如果我想有多个消费者消费队列中消息,这时候就不行了),队列名变更这时候得同时变更

2.Work queues 工作队列

2.1 模型

 为什么会出现工作队列

Simple队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理可能需要花费时间,这时候队列就会积压了很多消息

2.2 生成者

import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @data 生产者
 */
public class Send {
    /**                       |---C1
     *  p------------Queue----|---C2
     * @param args
     */
    private static final String QUEUE_NAME = "test_work_simple";
    public static  void main(String [] args) throws IOException, TimeoutException, InterruptedException {
      //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        for(int i=0;i<50;i++) {
            String msg="hello"+i;
            System.out.println("send msg"+msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(i*20);
        }
        channel.close();
        connection.close();
    }
}

消费者1

package com.rabbitmq.work;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * @data 消费者1
 */
public class Recv1 {
    private static final String QUEUE_NAME = "test_simple_queque";
    public static void main(String [] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //定义一个消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body,"utf-8");
                System.out.println("Recv msg"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] desc");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

package com.rabbitmq.work;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * @data 消费者2
 */
public class Recv2 {
    private static final String QUEUE_NAME = "test_simple_queque";
    public static void main(String [] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //定义一个消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body,"utf-8");
                System.out.println("Recv msg"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] desc");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

现象

消费者1和消费者2处理的消息是一样的

消费者1:偶数

消费者2:奇数

这种方式叫做轮询分类(round-robin) 结果就是不管谁忙活着谁请闲 都不会多给一个消息任务消息总是你一个我一个

3.公平分发

生成者

 

package com.rabbitmq.workfair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @data 生产者
 * @deprecated 公平分发
 */
public class Send {
    /**                       |---C1
     *  p------------Queue----|---C2
     * @param args
     */
    private static final String QUEUE_NAME = "test_work_simple";
    public static  void main(String [] args) throws IOException, TimeoutException, InterruptedException {
      //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /**
         * 公平分发
         * 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
         * 限制发送给同一个消费者不得超过一条消息
         */
        int prafetchCount = 1;
        channel.basicQos(prafetchCount);

        for(int i=0;i<50;i++) {
            String msg="hello"+i;
            System.out.println("send msg"+msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(i*20);
        }
        channel.close();
        connection.close();
    }
}

消费者1

package com.rabbitmq.work;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * @data 消费者1
 */
public class Recv1 {
    private static final String QUEUE_NAME = "test_simple_queque";
    public static void main(String [] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //定义一个消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body,"utf-8");
                System.out.println("Recv msg"+msg);
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] desc");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

package com.rabbitmq.work;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * @data 消费者1
 */
public class Recv2 {
    private static final String QUEUE_NAME = "test_simple_queque";
    public static void main(String [] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //保证一次只分发一个
        channel.basicQos(1);
        //定义一个消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body,"utf-8");
                System.out.println("Recv msg"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] desc");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        //boolean autoAck = true;
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

现象:消费2处理的消息比消费者1多 能者多劳

3.消息应答与消息持久化

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);

   boolean autoAck = true;(自动确认模式)一旦rabbitmq将消息分发给消费者,就会从内存中删除

这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息、

boolean autoAck = false;(手动模式),如果一个消费者挂掉,

就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息应答,高数rabbitmq这个消费

我已经处理完成,你可以删除了,然后rabbitmq就会删除内存中的消息

消息应答默认是打开的,false

Message acknowkedgment;

大家想想如果我rabbitmq挂了我们的消息任然会丢失!!!!!!!!!

.消息的持久化

//声明队列

boolean durable = false;
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

我们将程序中的boolean durable = false; 改成true; 是不可以的,尽管代码是正确

的,他也不会运行成功!因为我们已经定义了一个叫test_work_queue这个queue是未持久化,

rabbitmq不准许重新定义(不同参数)一个已存在的队列

订阅模式

.模型

解读:

1.一个生产者,多个消费者

2.每一个消费者都有自己的队列

3.生产者没有直接把消息发送到队列,而是发到了交换机 转发器exchange

4.每个队列都要绑定到交换机上

5.生产者发送的消息 经过交换机 到达队列 就能实现一个消费被多个消费者消费

注册->邮件->短信-

生产者

package com.rabbitmq.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * 订阅模式 生成者
 */
public class Send {
    private static final String EXCHANGE_NAME="text_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        //声明交换器
        //分支
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //发送消息
        String msg="hello ps";

        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());

        System.out.println("send:"+msg);
        channel.close();
        connection.close();
    }
}

 

 

消费者1 

 

package com.rabbitmq.ps;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * @data 消费者获取消费
 */
public class Recv1 {
    private static final String QUEUE_NAME = "test_queue_topic_email";
    private static final String EXCHANGE_NAME="text_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到交换机转发
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //保证一次只分发一次
        channel.basicQos(1);
        //定义一个消费者
        Consumer  consumer=new DefaultConsumer(channel){
            //消费到达后触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              String msg = new String(body,"utf-8");
                System.out.println("Recv msg[1]"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] deone");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck = false;
        channel.basicConsume("QUEUE_NAME",autoAck,consumer);
    }
}

消费者2

package com.rabbitmq.ps;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * @data 消费者获取消费
 */
public class Recv2 {
    private static final String QUEUE_NAME = "test_queue_topic_sms";
    private static final String EXCHANGE_NAME="text_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到交换机转发
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //保证一次只分发一次
        channel.basicQos(1);
        //定义一个消费者
        Consumer  consumer=new DefaultConsumer(channel){
            //消费到达后触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              String msg = new String(body,"utf-8");
                System.out.println("Recv msg[2]"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[2] deone");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume("QUEUE_NAME",autoAck,consumer);
    }
}

 

6.EXChange(交换机、转换器) 

一方面是接收生产者的消息,另一方面是向队列推送消息

匿名转发

Fanout(不处理路由键)

路由模式

模型

生成者

 

package com.rabbitmq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * @desc  生成者 路由模式
 */
public class Send {
    private static final String EXCHANGE_NAME="test_exchange_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        //取得链接
        Connection connection = ConnectionUtil.getConnection();
        //获取一个通道
        Channel channel = connection.createChannel();
        //交换机
        String test_direct="direct";
        channel.exchangeDeclare(EXCHANGE_NAME,test_direct);

        String msg = "hello direct";
        String routingKey = "error";
        channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
         System.out.println("send msg:"+msg);
        //关闭链接
        channel.close();
        connection.close();
    }
}

消费者1

package com.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * @desc 消费者消费
 */
public class Recv1 {
    private static final String EXCHANGE_NAME="test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        //取得一个连接
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

       Consumer consumer =  new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               String msg = new String(body,"utf-8");
               System.out.println("[1] Recv msg:"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        //手动模式
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

package com.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author bitao
 * @desc 消费者消费
 */
public class Recv2 {
    private static final String EXCHANGE_NAME="test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_2";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConnection();

        final Channel channel = connection.createChannel();

        channel.queueDeclare("QUEUE_NAME",false,false,false,null);

        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               String msg = new String(body,"utf-8");
                System.out.println("[2] Recv msg:"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[2] done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        //手动模式
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);


    }
}

 

Topic exchange

将路由键和某模式匹配

# 匹配一个或者多个

*匹配一个

Topic

模型

商品: 发布 删除 修改 查询 _________

生产者

package com.rabbitmq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author bitao
* @desc 生成者 路由模式
*/
public class Send {
private static final String EXCHANGE_NAME="test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//取得链接
Connection connection = ConnectionUtil.getConnection();
//获取一个通道
Channel channel = connection.createChannel();
//交换机
String test_direct="direct";
channel.exchangeDeclare(EXCHANGE_NAME,topic);

String msg = "hello direct";
String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("send msg:"+msg);
//关闭链接
channel.close();
connection.close();
}
}

消费者1

package com.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author bitao
* @desc 消费者消费
*/
public class Recv1 {
private static final String EXCHANGE_NAME="test_exchange_topic";
private static final String QUEUE_NAME = "test_queue_topic_1";

public static void main(String[] args) throws IOException, TimeoutException {
//取得一个连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"topic.add");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[1] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//手动模式
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}

消费者2

package com.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author bitao
* @desc 消费者消费
*/
public class Recv2 {
private static final String EXCHANGE_NAME="test_exchange_topic";
private static final String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = ConnectionUtil.getConnection();

final Channel channel = connection.createChannel();

channel.queueDeclare("QUEUE_NAME",false,false,false,null);

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"topic.#")

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[2] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//手动模式
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);


}
}

.Rabbitmq的消费确认机制(事务+confirm)

在rabbitmq中我们可以通过持久化数据解决rabbitmq服务器异常的数据丢失问题

问题:生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器默认的情况是不知

道的。

两种方式

 AMQP实现了事务机制

 Confirm模式

.事务机制

txSelect txCommit txRollback

txSelect:用户当前channel设置成transation模式

txCommit:用于提交事务

txRollback:回滚事务

.生产者

package com.rabbitmq.tx;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author
 * @desc 事务生产者
 */
public class TxSend {
    private static final String QUEUE_NAME = "test_queue_tx";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String msg = "hello tx messge";
        try {
            channel.txSelect();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("send msg"+msg);
            channel.txCommit();
        }catch (Exception e) {
            channel.txRollback();
            System.out.println("send message txRollback");
        }
        channel.close();
        connection.close();


    }
}

消费者

package com.rabbitmq.tx;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author
 * @desc 事务消费者消费
 */
public class TxRecv {
    private static final String QUEUE_NAME = "test_queue_tx";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              System.out.println("recv[tx] msg:"+ new String(body,"utf-8"));
            }
        });
    }
}

此种模式还是很消耗时的,采用这种方式降低了RabbitMq的消息吞吐量

.Confirm 模式

.生产者端confirm模式的实现原理

生产者将信道设置成confirm 模式,一旦信道进入confirm模式,所有在该信道上面发布

的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,

broker就会发送一个确认给生产者(包含消费的唯一ID),这就使得生产者知道消费已经正确到达目的队列了,如果

消费和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的

确认消息中 deliver-tag 域包含了确认消费的序列号,此外broker也可以设置basic.ack的multiple域,

表示到这个序列号之前的所有消息都已经得到了处理。

Confirm 模式最大的好处在于它是异步

Nack

开启confirm 模式

channel.confirmSelect()

编程模式

1.普通 发一条 waitForConfirms()

2.批量的 发一批 waitForConfirms

3.异步 confirm 模式:提供一个回调方法

Confirm 单条

生产者:

package com.rabbitmq.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author
 * 普通模式
 */
public class Send1 {
    private static final String QUEUE_NAME = "test_queue_confirm1";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //生产者调用confirSelect 将channel设置为confirm模式注意
        channel.confirmSelect();

        String msg= "hell confirm message";
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

        if(!channel.waitForConfirms()) {
            System.out.println("message send failed");
        }else {
            System.out.println("message send ok");
        }
        channel.close();
        connection.close();
    }
}

消费者

package com.rabbitmq.confirm;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * confirm事务
 */
public class Recv {
    private static final  String QUEUE_NAME = "test_queue_confirm1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv[confirm] msg:" + new String(body,"utf-8"));
            }
        });
    }
}

批量

package com.rabbitmq.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;
import com.sun.org.apache.xerces.internal.impl.dv.xs.SchemaDVFactoryImpl;

import javax.imageio.stream.ImageInputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author
 * @desc 批量
 */
public class Send2 {
    private static final String QUEUE_NAME ="test_queue_confirm1";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.confirmSelect();
        String msg = "hello config message";
        //批量发送
        for(int i = 0; i<10;i++) {
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        //确认
        if(!channel.waitForConfirms()) {
            System.out.println("message send failed");
        }else {
            System.out.println("message send ok");
        }
    }


}

.异步模式

Channel 对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Channel发出

的消息序列号),我们需要自己为每一个Channel维护一个unconfirm的消息序列号集合,

每publish 一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉

相应的一条(multiple = false) 或多条(mulitple=true)记录。从程序运行效率上看,这个

unconfirm集合最好采用有序集合SortedSet存储结构。

 

消息机制生产者

package com.rabbitmq.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

/**
 * @desc 消息确认机制之confirm异步
 */
public class Send3 {
    private static final String QUEUE_NAME = "test_queue_confirm3";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //生产者调用confirmSelect 将channel设置为confirm模式注意
        channel.confirmSelect();
        //未确认的标识
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
        //通道添加监听
        channel.addConfirmListener(new ConfirmListener() {
            //bandleNack
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if(multiple) {
                    System.out.println("------handleNack-----------multiple------");
                    confirmSet.headSet(deliveryTag+1).clear();
                }else {
                    System.out.println("------handleNack-----------multiple false------");
                    confirmSet.remove(deliveryTag);
                }
            }
            //没有问题的handleNack
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if(multiple) {
                    System.out.println("------handleNack-----------------multiple--------");
                    confirmSet.headSet(deliveryTag+1).clear();
                }else {
                    System.out.println("-------handleNack-----------------multiple false--------------");
                    confirmSet.remove(deliveryTag);
                }
            }
        });
        String msgSet= "sssssss";
        while (true) {
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("",QUEUE_NAME,null,msgSet.getBytes());
            confirmSet.add(seqNo);
        }
    }
}

消息确认机制之confirm异步消费者消费

package com.rabbitmq.confirm;

import com.rabbitmq.client.*;
import com.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * confirm事务
 */
public class Recv {
    private static final  String QUEUE_NAME = "test_queue_confirm3";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv[confirm] msg:" + new String(body,"utf-8"));
            }
        });
    }
}

Spring集成RabbitMq

生产者

消费者

 SpringApplication.xml配置文件 

实例代码:

https://github.com/javawebservlet/rabbitmqdemo.git 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值