使用Python构建RabbitMQ应用程序是一个强大的方法,用于实现异步消息传递

使用Python构建RabbitMQ应用程序是一个强大的方法,用于实现异步消息传递。RabbitMQ是一种开源的消息代理软件,支持多种消息协议,包括AMQP(高级消息队列协议)。下面是如何使用Python和RabbitMQ进行开发的基本介绍:

安装RabbitMQ服务器

首先,你需要在你的系统上安装RabbitMQ服务器。你可以通过以下命令在Ubuntu上安装RabbitMQ:

sudo apt-get update
sudo apt-get install rabbitmq-server

安装完成后,启动RabbitMQ服务:

sudo systemctl start rabbitmq-server

安装RabbitMQ的Python客户端库

你可以使用pika库来与RabbitMQ进行交互。安装pika库:

pip install pika

创建一个简单的生产者脚本

下面是一个使用pika库发送消息到RabbitMQ的简单示例:

import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='hello')

# 发送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")

# 关闭连接
connection.close()

创建一个简单的消费者脚本

下面是一个从队列中接收消息的消费者脚本:

import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='hello')

# 定义回调函数来处理消息
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 告诉RabbitMQ使用callback来接收信息
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

运行脚本

  1. 先运行消费者脚本,让它开始等待消息。
  2. 然后运行生产者脚本,发送一条消息。
  3. 你应该会看到消费者脚本接收到了消息并打印出来。

在RabbitMQ中实现消息的持久化,可以通过以下步骤进行:

  1. 声明队列时使用持久化:在声明队列时,将durable参数设置为true。这样,即使RabbitMQ服务器重启,队列及其内容也会被保留。
  2. 发布消息时使用持久化:在发布消息时,将deliveryMode参数设置为2(持久化模式)。这样,即使RabbitMQ服务器重启,消息也不会丢失。
  3. 确保消息被确认:为了确保消息被正确处理,可以在发布消息后等待确认。这可以通过设置mandatoryimmediate参数来实现。
  4. 处理消费者断开连接的情况:如果消费者在处理消息时断开连接,可以使用死信队列(DLX)来重新发送未处理的消息。

示例代码如下:

import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明持久化队列
channel.queue_declare(queue='my_queue', durable=True)

# 发布持久化消息
channel.basic_publish(
    exchange='',
    routing_key='my_queue',
    body='Hello, World!',
    properties=pika.BasicProperties(
        delivery_mode=2,  # 持久化模式
    ))

# 确保消息被确认
channel.basic_ack(delivery_tag=method.delivery_tag)

# 关闭连接
connection.close()

在RabbitMQ中设置死信队列(Dead Letter Queue, DLQ)可以帮助处理那些无法被成功处理的消息。这些消息可能由于各种原因(如消费者处理失败、消息过期等)而需要重新处理或记录。以下是如何在RabbitMQ中设置死信队列的步骤:

  1. 创建主队列和死信队列:
    首先,你需要创建一个普通队列和一个死信队列。例如:

    rabbitmqctl add_queue main_queue
    rabbitmqctl add_queue dlx_queue
    
  2. 配置主队列的死信路由:
    使用RabbitMQ的管理界面或者通过命令行工具rabbitmqctl来配置主队列的死信交换器(Dead Letter Exchange, DLX)。例如:

    rabbitmqctl set_policy TTL "main_queue" '{"dead-letter-exchange":"","dead-letter-routing-key":"dlx_queue"}' --apply-to queues
    

    在这个例子中,TTL是策略名称,可以随意命名。这个策略将指定当消息在主队列中达到某个条件(如过期)时,会被重新发布到指定的死信交换器(这里是默认的空字符串),并使用指定的路由键(这里是dlx_queue)路由到死信队列。

  3. 设置死信交换器和路由键:
    确保你的死信交换器存在并且正确配置了路由键。如果使用的是默认的空字符串作为死信交换器,那么消息将会被路由到同一个连接上的默认交换器。

  4. 验证配置:
    你可以通过RabbitMQ管理界面查看队列和交换器的配置,确保它们已经正确设置。

  5. 测试死信功能:
    向主队列发送消息,并模拟一些会导致消息进入死信队列的场景,比如消息过期、拒绝接收等。然后检查死信队列中是否收到了这些消息。

在RabbitMQ中,为特定队列设置TTL(Time-To-Live)可以通过以下步骤实现:

  1. 声明队列时设置TTL
    在声明队列时,可以使用x-message-ttl参数来设置消息的TTL。这个参数以毫秒为单位,表示消息在队列中存活的最长时间。如果消息在队列中停留的时间超过这个值,它将被自动删除。

  2. 使用队列属性设置TTL
    可以通过AMQP客户端库中的相关方法来设置队列的属性。例如,在使用Python的pika库时,可以在声明队列时传递arguments参数来设置TTL。

  3. 确保消费者处理速度
    需要注意的是,即使设置了TTL,如果消费者处理消息的速度非常慢,消息仍然可能会在队列中停留很长时间。因此,确保消费者的处理速度与消息的生成速度相匹配是非常重要的。

  4. 监控和管理
    定期监控队列的状态和消息的流动情况,以确保TTL设置符合预期效果。如果发现消息经常因为TTL而被丢弃,可能需要调整TTL的值或者优化消费者的处理能力。

  5. 考虑死信队列
    如果某些消息因为TTL而被丢弃,可以考虑将这些消息发送到死信队列(Dead Letter Queue),以便进行进一步的处理或分析。

  6. 测试和验证
    在生产环境中应用TTL之前,建议先在测试环境中进行充分的测试,以确保TTL的行为符合预期,并且不会对系统的稳定性造成影响。

  7. 文档和记录
    记录下关于队列TTL设置的决策过程和理由,这对于未来的维护和问题排查都是非常有帮助的。

  8. 性能考量
    虽然TTL可以帮助管理消息的生命周期,但它也可能增加系统的复杂性和处理开销。因此,在决定是否使用TTL时,需要权衡其带来的便利性与潜在的性能影响。

  9. 兼容性检查
    确保所使用的RabbitMQ版本支持TTL功能,并且客户端库也兼容这一特性。不同版本的RabbitMQ可能在功能上有所差异。

  10. 安全和权限
    确保只有授权的用户和服务能够访问和修改队列的TTL设置,以防止未经授权的操作导致数据丢失或其他安全问题。

RabbitMQ is a powerful messaging broker based on the Advanced Message Queueing Protocol (AMQP). Thanks to the neutral nature of the AMQP spec, it is easy to connect to it from many platforms, including Python. In this blog entry, we will:

Create a simple stock ticker Python application

Create a brokerage Python application that decides when to buy and sell.

Compare pika, an AMQP library created by the RabbitMQ team, with py-amqplib.

You can find all the source code for this blog at http://github.com/gregturn/amqp-demo. This assumes you have already installed RabbitMQ based on instructions for your platform and fired it up. Personally, I have it running on my Mac OS X machine (snow leopard).

By the way:

The code written in this blog entry is for demonstration purposes only. Do not rely on the algorithms for financial advice.

With that out of the way, let’s write some code!
Building the stock ticker

A good example for a messaging solution is a stock ticker system. The stock exchange publishes messages to the broker indicating stock name, price, and time.

import pickle
import random
import time

class Ticker(object):
def init(self, publisher, qname):
self.publisher = publisher

    # This quickly creates four random stock symbols
    chars = range(ord("A"), ord("Z")+1)
    def random_letter(): return chr(random.choice(chars))
    self.stock_symbols = [random_letter()+random_letter()+random_letter() for i in range(4)]

    self.last_quote = {}
    self.counter = 0
    self.time_format = "%a, %d %b %Y %H:%M:%S +0000"
    self.qname = qname

def get_quote(self):
    symbol = random.choice(self.stock_symbols)
    if symbol in self.last_quote:
        previous_quote = self.last_quote[symbol]
        new_quote = random.uniform(0.9*previous_quote, 1.1*previous_quote)
        if abs(new_quote) - 0 < 1.0:
            new_quote = 1.0
        self.last_quote[symbol] = new_quote
    else:
        new_quote = random.uniform(10.0, 250.0)
        self.last_quote[symbol] = new_quote
    self.counter += 1
    return (symbol, self.last_quote[symbol], time.gmtime(), self.counter)

def monitor(self):
    while True:
        quote = self.get_quote()
        print("New quote is %s" % str(quote))
        self.publisher.publish(pickle.dumps((quote[0], quote[1], time.strftime(self.time_format, quote[2]), quote[3])), routing_key="")
        secs = random.uniform(0.1, 0.5)
        #print("Sleeping %s seconds..." % secs)
        time.sleep(secs)

This application randomly creates four stock symbols, and then starts to create quotes. It initially picks a random value between 10.0 and 250.0, then proceeds to randomly adjust the price between 90% and 110% of the previous price. It then randomly waits between 0.1 and 0.5 seconds before ticking off the next quote. An important part of this code design is the fact that publishing to an AMQP broker has been decoupled from the stock ticker. Instead, it expects a publisher service to be injected when constructed.

Its important to note that we are using pickle to serialize our tuple of stock quote data. In AMQP, the body of the message is just a series of bytes. What is stored and how it is serialized is not part of the spec, and instead must be agreed upon between sender and receiver. In our situation, the publisher and subscriber have both agreed upon it containing a pickled tuple.
Creating an AMQP service

The next step is to create our AMQP client service. Its purpose is to make it easy for us to properly isolate talking to the AMQP server, either through publishing or by consuming events.

from amqplib import client_0_8 as amqp

class PyAmqpLibPublisher(object):
def init(self, exchange_name):
self.exchange_name = exchange_name
self.queue_exists = False

def publish(self, message, routing_key):
    conn = amqp.Connection(host="127.0.0.1", userid="guest", password="guest", virtual_host="/", insist=False)

    ch = conn.channel()

    ch.exchange_declare(exchange=self.exchange_name, type="fanout", durable=False, auto_delete=False)

    msg = amqp.Message(message)
    msg.properties["content_type"] = "text/plain"
    msg.properties["delivery_mode"] = 2
    ch.basic_publish(exchange=self.exchange_name,
                     routing_key=routing_key,
                     msg=msg)
    ch.close()
    conn.close()

An important thing to notice here is that the declared exchange is type “fanout”. This means that every queue that binds to it will receive a copy of the message without expensive processing on the broker’s end.

You may be wondering why the content_type of the body is “text/plain”, considering it’s a serialized message. This is because Python’s pickle library encodes data in an ASCII-armored format that is viewable with any tool without causing weird behavior.
Buy low, sell high

Some simple, sage advice is to buy when the price is low and sell it when the price is high. Here we’ll look at a simple client that subscribes for stock quotes, collects a historical trend of prices to figure out whether the next price is on the low end or high end, and then decides to buy or sell.

import pickle
import random
import uuid

class Buyer(object):
def init(self, client, qname, trend=5):
self.holdings = {}
self.cash = 100000.0
self.history = {}
self.qname = qname
self.client = client
self.trend = trend
self.qname = uuid.uuid4().hex

def decide_whether_to_buy_or_sell(self, quote):
    symbol, price, date, counter = quote
    #print "Thinking about whether to buy or sell %s at %s" % (symbol, price)

    if symbol not in self.history:
        self.history[symbol] = [price]
    else:
        self.history[symbol].append(price)

    if len(self.history[symbol]) >= self.trend:
        price_low = min(self.history[symbol][-self.trend:])
        price_max = max(self.history[symbol][-self.trend:])
        price_avg = sum(self.history[symbol][-self.trend:])/self.trend
        #print "Recent history of %s is %s" % (symbol, self.history[symbol][-self.trend:])
    else:
        price_low, price_max, price_avg = (-1, -1, -1)
        print "%s quotes until we start deciding whether to buy or sell %s" % (self.trend - len(self.history[symbol]), symbol)
        #print "Recent history of %s is %s" % (symbol, self.history[symbol])

    if price_low == -1: return

    #print "Trending minimum/avg/max of %s is %s-%s-%s" % (symbol, price_low, price_avg, price_max)
    #for symbol in self.holdings.keys():
    #    print "self.history[symbol][-1] = %s" % self.history[symbol][-1]
    #    print "self.holdings[symbol][0] = %s" % self.holdings[symbol][0]
    #    print "Value of %s is %s" % (symbol, float(self.holdings[symbol][0])*self.history[symbol][-1])
    value = sum([self.holdings[symbol][0]*self.history[symbol][-1] for symbol in self.holdings.keys()])
    print "Net worth is %s + %s = %s" % (self.cash, value, self.cash + value)

    if symbol not in self.holdings:
        if price < 1.01*price_low:
            shares_to_buy = random.choice([10, 15, 20, 25, 30])
            print "I don't own any %s yet, and the price is below the trending minimum of %s so I'm buying %s shares." % (symbol, price_low, shares_to_buy)
            cost = shares_to_buy * price
            print "Cost is %s, cash is %s" % (cost, self.cash)
            if cost < self.cash:
                self.holdings[symbol] = (shares_to_buy, price, cost)
                self.cash -= cost
                print "Cash is now %s" % self.cash
            else:
                print "Unfortunately, I don't have enough cash at this time."
    else:
        if price > self.holdings[symbol][1] and price > 0.99*price_max:
            print "+++++++ Price of %s is higher than my holdings, so I'm going to sell!" % symbol
            sale_value = self.holdings[symbol][0] * price
            print "Sale value is %s" % sale_value
            print "Holdings value is %s" % self.holdings[symbol][2]
            print "Total net is %s" % (sale_value - self.holdings[symbol][2])
            self.cash += sale_value
            print "Cash is now %s" % self.cash
            del self.holdings[symbol]

def handle_pyamqplib_delivery(self, msg):
    self.handle(msg.delivery_info["channel"], msg.delivery_info["delivery_tag"], msg.body)

def handle(self, ch, delivery_tag, body):
    quote = pickle.loads(body)
    #print "New price for %s => %s at %s" % quote
    ch.basic_ack(delivery_tag = delivery_tag)
    print "Received message %s" % quote[3]
    self.decide_whether_to_buy_or_sell(quote)

def monitor(self):
    self.client.monitor(self.qname, self.handle_pyamqplib_delivery)

This client has its strategy for buying and selling stocks nicely isolated from the machinery for receiving messages from RabbitMQ.

monitor is the main hook to kick off listening for new stock quotes. It registers handle_pyamqplib_delivery as the callback method to invoke every time a new quote arrives.

handle_pyamqplib_delivery pulls out the important parts of the message and hands them to handle. The reason for inserting this extra method call is to support swapping out py-amqplib with pika, which we’ll look at later.

handle de-pickles the opaque body of the message, acknowledges receipt of the message on the channel with the broker, and then fires off its algorithm about deciding whether to buy or sell.

decide_whether_to_buy_or_sell splits up the tuple of the stock quote and then adds the price to its stock symbol history. It’s geared to collect a minimum number of quotes before making a decision. Wouldn’t you? Then it calculates the minimum and maximum of the trend, and if the price is relatively close to the minimum, it buys. However, if it already has shares, then it waits for the price to rise above what it originally paid for it. When that happens, it sells.

The part missing from this is the self.client.monitor function. self.client is our hook on the AMQP service coded earlier, and we need a way to bind our queue to the exchange to receive messages. The following function needs to be added to PyAmqpLibPublisher.

def monitor(self, qname, callback):
    conn = amqp.Connection(host="127.0.0.1", userid="guest", password="guest")

    ch = conn.channel()

    if not self.queue_exists:
        ch.queue_declare(queue=qname, durable=False, exclusive=False, auto_delete=False)
        ch.queue_bind(queue=qname, exchange=self.exchange_name)
        print "Binding queue %s to exchange %s" % (qname, self.exchange_name)
        #ch.queue_bind(queue=qname, exchange=self.exchange_name, routing_key=qname)
        self.queue_exists = True

    ch.basic_consume(callback=callback, queue=qname)

    while True:
        ch.wait()
    print 'Close reason:', conn.connection_close

This shows the basic pattern of connecting to our RabbitMQ broker, declaring a queue, binding it to the fanout exchange, and then registering a callback.

But let’s not getting all wrapped up in how to make this algorithm better at picking winners and losers. Instead, let’s recognize that this makes it very easy for any financial company to subscribe for stock quotes by creating a unique queue, binding to the stock system’s fanout exchange, and then write their own algorithm for making financial decision.
Replacing py-amqplib with pika

AMQP is a nicely written spec. It includes an XML format, supporting the ability to automatically generate client libraries. This means that libraries that were coded to spec are easy to swap out, and pick based on the merits of their implementation. A popular library in the Python community is py-amqplib. One of its limitations, as noted on its project site, is that it blocks and doesn’t currently provide concurrency. pika offers both.

The important point is that migrating from py-amqplib to pika is actually quite easy. The AMQP-based methods are the same, and the underpinning concepts are the same as well. Let’s look at writing an alternative AMQP service using pika.

import pika

class PikaPublisher(object):
def init(self, exchange_name):
self.exchange_name = exchange_name
self.queue_exists = False

def publish(self, message, routing_key):
    conn = pika.AsyncoreConnection(pika.ConnectionParameters(
            '127.0.0.1',
            credentials=pika.PlainCredentials('guest', 'guest')))

    ch = conn.channel()

    ch.exchange_declare(exchange=self.exchange_name, type="fanout", durable=False, auto_delete=False)

    ch.basic_publish(exchange=self.exchange_name,
                     routing_key=routing_key,
                     body=message,
                     properties=pika.BasicProperties(
                            content_type = "text/plain",
                            delivery_mode = 2, # persistent
                            ),
                     block_on_flow_control = True)
    ch.close()
    conn.close()

def monitor(self, qname, callback):
    conn = pika.AsyncoreConnection(pika.ConnectionParameters(
            '127.0.0.1',
            credentials=pika.PlainCredentials('guest', 'guest')))

    ch = conn.channel()

    if not self.queue_exists:
        ch.queue_declare(queue=qname, durable=False, exclusive=False, auto_delete=False)
        ch.queue_bind(queue=qname, exchange=self.exchange_name)
        print "Binding queue %s to exchange %s" % (qname, self.exchange_name)
        #ch.queue_bind(queue=qname, exchange=self.exchange_name, routing_key=qname)
        self.queue_exists = True

    ch.basic_consume(callback, queue=qname)

    pika.asyncore_loop()
    print 'Close reason:', conn.connection_close

This is very similar to the other service shown earlier. Creating a connection is a little different, but included the same pieces of data such as host for the broker, and username and password. basic_publish is slightly different in that the message and its properties are put together inside the method call. py-amqplib declares the entire message and its properties in a slightly different structure, and then passes it into basic_publish as one argument. Good thing about the spec is knowing that all the important pieces are in both libraries.

pika supports different waiting mechanisms compared to py-amqplib. py-amqplib has a blocking wait, while pika offers both a blocking mechanism as well as one that uses Python’s asyncore utility for asynchronous operations. We can explore this in future blog entries about RabbitMQ and Python.

There is a slight difference in the method signature of the callback between these two libraries. We need to update our brokerage client to handle it suitably.

def handle_pyamqplib_delivery(self, msg):
    self.handle(msg.delivery_info["channel"], msg.delivery_info["delivery_tag"], msg.body)

Compare this with pika’s callback method signature.

def handle_pika_delivery(self, ch, method, header, body):
    self.handle(ch, delivery_tag, body)

They are very close. The important parts are there. The difference is based on the fact that pika splits up the parts of the message while py-amqplib combines it all inside a single class. This is the reason there is a decoupling between the callback method and the actual method that extracts the body of our message. By extracting the necessary parts, it is possible to switch between these two libraries without rewriting our buy/sell algorithm.
Running things

With all this code, we need to run things. It’s easy to code a runner script and just spin things up.

########################################

To run this demo using py-amqplib,

uncomment this block, and comment out

the next block.

########################################

#from amqplib_client import *
#publisher = PyAmqpLibPublisher(exchange_name=“my_exchange”)

########################################

To run this demo using pika,

uncomment this block, and comment out

the previous block

########################################

from pika_client import *
publisher = PikaPublisher(exchange_name=“my_exchange”)

########################################

This part doesn’t have to change

########################################

from ticker_system import *
ticker = Ticker(publisher, “”)
ticker.monitor()

This runner can be switched between running either the py-amqplib or pika version of our stock ticker system. Now we just need a runner for the brokerage service.

########################################

To run this demo using py-amqplib,

uncomment this block, and comment out

the next block.

########################################

#from amqplib_client import *
#publisher = PyAmqpLibPublisher(exchange_name=“my_exchange”)

########################################

To run this demo using pika,

uncomment this block, and comment out

the previous block

########################################

from pika_client import *
publisher = PikaPublisher(exchange_name=“my_exchange”)

########################################

This part doesn’t have to change

########################################

from buy_low_sell_high import *
buyer = Buyer(publisher, “”, trend=25)
print “Buyer = %s” % id(buyer)
buyer.monitor()

In future blog entries, we can entertain running the same code using a pythonic DI container.

A good spec provides great options

The AMQP spec makes it easy to pick libraries based on more than just technical merit. By splitting up the machinery of AMQP from the logic of generating quotes as well as parsing them, it was pretty easy to swap out py-amqplib and pika. The core method names are the same. Several arguments are the same. But even more important: the architectural concepts are the same. The choice about which library to choose can now include not just technical merit, but instead things like customer support, spec compliance, synchronous vs. asynchronous support, and usability.

comments powered by Disqus
在这里插入图片描述

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值