Python操作消息队列RabbitMQ的完整指南
在现代分布式系统中,消息队列如同人体的神经系统,负责在各个服务之间可靠、高效地传递信息。RabbitMQ 作为 AMQP(高级消息队列协议)的标杆实现,凭借其灵活的路由、可靠的消息投递和丰富的生态,成为了架构师工具箱中的必备品。本文作为 Python 中间件系列 的开篇,将带你从零开始,用 Python 操作 RabbitMQ,从 “Hello World” 直连模式讲到延迟队列、RPC 等高级场景,所有示例均附带可直接运行的代码和详尽的控制台输出解析。
1. 核心概念速览
在敲代码之前,先在大脑中建立一张地图。RabbitMQ 的核心由三个角色和两个关键组件构成:
- 生产者 (Producer):发送消息的应用程序。
- 消费者 (Consumer):接收并处理消息的应用程序。
- 队列 (Queue):RabbitMQ 内部的缓冲区,用于存储消息。消息一旦进入队列,就由 RabbitMQ 负责保管,直到消费者取走。
- 交换机 (Exchange):生产者不会直接把消息发到队列,而是发给交换机。交换机根据规则,决定消息路由到一个或多个队列。主要有四种类型:
direct、fanout、topic、headers。 - 绑定 (Binding):队列和交换机之间的“连线”,在绑定时会指定路由键 (Routing Key)。交换机根据路由键和自身类型,决定把消息投递到哪些绑定的队列。
一句话总结:生产者 -> 交换机 -(通过绑定和路由键)-> 队列 <- 消费者
理解了这张图,后面的代码就只是把它翻译成 Python 语言而已。
2. 环境准备
我们使用 Docker 快速启动 RabbitMQ,并安装 Python 客户端 pika。
启动 RabbitMQ(带管理界面的版本):
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
启动后,可通过 http://localhost:15672 访问管理界面,默认账号密码 guest/guest。
安装 pika 库:
接下来,所有代码示例都将围绕这两个环境展开。
3. 第一章:Hello World —— 最简单的消息模型
我们先从一个最简单的 单生产者 -> 单消费者 模型开始,发送一句 “Hello World”。
生产者send.py
import pika
# 1. 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 2. 声明队列(如果队列不存在则创建,存在则复用)
channel.queue_declare(queue='hello')
# 3. 发布消息
channel.basic_publish(exchange='', # 使用默认交换机
routing_key='hello', # 消息直接发到 'hello' 队列
body='Hello World!')
print(" [x] Sent 'Hello World!'")
# 4. 关闭连接
connection.close()
消费者receive.py
import pika
# 1. 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 2. 声明队列(幂等操作,确保队列存在)
channel.queue_declare(queue='hello')
# 3. 定义回调函数
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 4. 订阅队列
channel.basic_consume(queue='hello',
auto_ack=True, # 自动确认(先这样,后面会讲)
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
运行与输出解析
打开两个终端:
终端 1(消费者):
$ python receive.py [*] Waiting for messages. To exit press CTRL+C [x] Received Hello World!
终端 2(生产者):
$ python send.py [x] Sent 'Hello World!'
这里的关键点:
- 生产者使用默认交换机(空字符串
''),此时routing_key就是目标队列名。 - 消费者用
basic_consume持续监听队列,auto_ack=True表示消息一旦送出就自动确认删除。
4. 第二章:工作队列 —— 任务的分发与负载均衡
真实世界中,单个消费者往往忙不过来。工作队列允许多个消费者从同一个队列中取任务,消息只被一个消费者处理。
循环分发(Round-robin)
默认情况下,RabbitMQ 会把消息轮流发给所有消费者。我们创建持续发送任务的 new_task.py 和多个 worker.py。
生产者 new_task.py:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')
for i in range(1, 6):
message = f"Task {i}"
channel.basic_publish(exchange='', routing_key='task_queue', body=message)
print(f" [x] Sent '{message}'")
time.sleep(0.5)
connection.close()
消费者 worker.py:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 模拟处理耗时(偶数任务耗时短,奇数耗时长)
time.sleep(2 if int(body.decode().split()[1]) % 2 != 0 else 0.5)
print(f" [x] Done {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认
# 重要:每次只分发一条消息,消费者处理完并确认后才发下一条
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages...')
channel.start_consuming()
运行与公平分发演示
打开三个终端:一个生产,两个消费。
终端 1(Worker A):
$ python worker.py [*] Waiting for messages... [x] Received Task 1 # 耗时2秒 [x] Done Task 1 [x] Received Task 3 # 耗时2秒 [x] Done Task 3 [x] Received Task 5 # 耗时2秒
终端 2(Worker B):
$ python worker.py [*] Waiting for messages... [x] Received Task 2 # 耗时0.5秒,先完成 [x] Done Task 2 [x] Received Task 4 # 耗时0.5秒 [x] Done Task 4
终端 3(生产者):
$ python new_task.py [x] Sent 'Task 1' [x] Sent 'Task 2' [x] Sent 'Task 3' [x] Sent 'Task 4' [x] Sent 'Task 5'
输出解读:
- 我们没有使用自动确认
auto_ack,而是手动basic_ack。这保证了如果 Worker A 在处理 Task 1 时崩溃,该消息会重新入队并分发给 Worker B。 basic_qos(prefetch_count=1)是关键。它告诉 RabbitMQ:不要同时给我超过 1 条消息。这样 Worker A 在处理 Task 1 时,尽管 Task 2、3 已经入队,但 Task 3 不会预发给 A,而是会发给空闲的 Worker B。这就实现了公平分发,处理速度快的消费者将拿到更多任务。
5. 第三章:发布/订阅 —— 用 Fanout 交换机广播消息
现在,我们需要让多个消费者都能收到同一条消息,就像日志广播。这需要引入交换机。fanout 交换机将消息广播到所有绑定的队列。
生产者emit_log.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = "info: Hello Fanout!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()
消费者receive_logs.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 让 RabbitMQ 生成一个唯一的、临时队列,消费者断开后自动删除
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 绑定队列到交换机
channel.queue_bind(exchange='logs', queue=queue_name)
print(f' [*] Waiting for logs on queue: {queue_name}')
def callback(ch, method, properties, body):
print(f" [x] {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
运行演示:一发多收
同时启动两个 receive_logs.py,再执行 emit_log.py。
消费者终端 1:
$ python receive_logs.py [*] Waiting for logs on queue: amq.gen-JzTY20BRgKO-HjmUJj0wLg [x] info: Hello Fanout!
消费者终端 2:
$ python receive_logs.py [*] Waiting for logs on queue: amq.gen-0cHw5VhC7KnpjRzAsj0Xww [x] info: Hello Fanout!
生产者终端:
$ python emit_log.py [x] Sent info: Hello Fanout!
可见,两个消费者都收到了相同的消息。关键在于 queue_declare(queue='', exclusive=True):每个消费者启动时都会创建一个随机的独占临时队列,并绑定到 logs 交换机。生产者完全不需要关心消费者的数量和地址,达到了彻底的解耦。
6. 第四章:路由模式 —— 用 Direct 交换机精准投递
Fanout 是“无脑广播”,而 direct 交换机根据完全匹配的路由键,将消息投递给绑定键相同的队列。适用于按日志级别(error、info)分发。
生产者emit_direct_log.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(f" [x] Sent {severity}:{message}")
connection.close()
消费者receive_direct_log.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 通过命令行参数指定绑定的路由键,如 python receive_direct_log.py error info
severities = sys.argv[1:] if len(sys.argv) > 1 else ['info']
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
print(f' [*] Waiting for {severities} logs. Queue: {queue_name}')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
运行演示
终端 1(只收 error):
$ python receive_direct_log.py error [*] Waiting for ['error'] logs. Queue: amq.gen-XXX
终端 2(收 error 和 info):
$ python receive_direct_log.py error info [*] Waiting for ['error', 'info'] logs. Queue: amq.gen-YYY
生产者发送消息:
$ python emit_direct_log.py error "Disk full" [x] Sent error:Disk full $ python emit_direct_log.py info "Server started" [x] Sent info:Server started
输出:终端1只收到 error 消息,终端2则两条都收到。 这种精确匹配非常适合按模块、优先级区分处理。
7. 第五章:主题模式 —— 用 Topic 交换机实现灵活匹配
topic 交换机是 direct 的升级版。路由键是由点分隔的单词列表(如 “weather.us.east”),绑定键可以使用通配符:
*匹配刚好一个单词。#匹配零个或多个单词。
生产者emit_topic.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
消费者receive_topic.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
# 绑定键从命令行接收,例如: "kern.*" 或 "*.critical" 或 "#"
binding_keys = sys.argv[1:] if len(sys.argv) > 1 else ['anonymous.*']
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(f' [*] Waiting for logs. Binding keys: {binding_keys}. Queue: {queue_name}')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
运行演示
消费者 1:订阅所有 kern 开头的日志
$ python receive_topic.py "kern.*" [*] Waiting for logs. Binding: ['kern.*']
消费者 2:订阅所有 critical 结尾的日志
$ python receive_topic.py "*.critical" [*] Waiting for logs. Binding: ['*.critical']
消费者 3:接收所有日志(#)
$ python receive_topic.py "#" [*] Waiting for logs. Binding: ['#'] **生产者:** ```bash $ python emit_topic.py kern.critical "Kernel panic" [x] Sent kern.critical:Kernel panic $ python emit_topic.py app.critical "App crash" [x] Sent app.critical:App crash $ python emit_topic.py kern.info "Kernel info" [x] Sent kern.info:Kernel info
结果:
kern.critical会被三个消费者都收到(匹配kern.*,*.critical,#)。app.critical仅被消费者2和3收到。kern.info仅被消费者1和3收到。
Topic 交换机提供了极其强大的基于模式的消息路由,是构建事件驱动架构的利器。
8. 第六章:消息可靠性 —— 确认、持久化与发布者确认
生产环境中,消息不能丢。RabbitMQ 提供了三重保障:
- 消费者确认(Acknowledgement):消费者告诉 RabbitMQ,消息已成功处理。我们已在工作队列中使用
basic_ack。 - 队列持久化(Durable):RabbitMQ 重启后队列不丢失。
queue_declare(queue='task_queue', durable=True)。 - 消息持久化:消息本身标记为持久,写入磁盘。
properties=pika.BasicProperties(delivery_mode=2)。 - 发布者确认(Publisher Confirms):生产者知道消息是否成功到达 RabbitMQ。
我们来改造工作队列,加入发布者确认和持久化。
可靠生产者reliable_send.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 启用发布者确认
channel.confirm_delivery()
# 声明持久化队列
channel.queue_declare(queue='durable_task', durable=True)
for i in range(1, 4):
message = f"Persistent Task {i}"
# 将消息标记为持久化
properties = pika.BasicProperties(delivery_mode=2)
try:
channel.basic_publish(exchange='',
routing_key='durable_task',
body=message,
properties=properties)
print(f" [x] Confirmed: {message}")
except pika.exceptions.UnroutableError:
print(f" [x] Failed to route: {message}")
connection.close()
可靠消费者reliable_worker.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='durable_task', durable=True)
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 模拟处理
import time
time.sleep(1)
print(f" [x] Done {body.decode()}")
# 手动确认
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='durable_task', on_message_callback=callback)
print(' [*] Waiting for durable tasks...')
channel.start_consuming()
运行演示
先启动 reliable_worker.py,再执行 reliable_send.py:
生产者输出:
$ python reliable_send.py [x] Confirmed: Persistent Task 1 [x] Confirmed: Persistent Task 2 [x] Confirmed: Persistent Task 3
现在即使重启 RabbitMQ(docker restart rabbitmq),队列和尚未被消费的持久化消息也不会丢失。
提示:持久化有性能开销,只对关键消息使用。
9. 第七章:高级应用 —— 死信队列与延迟队列
如何实现“订单30分钟未支付自动取消”这样的延迟任务?RabbitMQ 本身没有延迟队列,但可以用 死信交换机(DLX) 和 消息TTL(存活时间) 组合实现。
原理
- 给队列设置
x-dead-letter-exchange和x-dead-letter-routing-key。当消息在该队列中变成死信(被拒绝、过期或队列满)时,会被自动转发到死信交换机。 - 给消息设置
expiration(毫秒)。消息过期后会变成死信,从而被投递到指定的延迟队列。
延迟队列代码delay_queue.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 1. 定义死信交换机
dlx_exchange = 'dlx_exchange'
channel.exchange_declare(exchange=dlx_exchange, exchange_type='direct')
# 2. 死信队列(真正延迟后消费的队列)
dlx_queue = 'delayed_queue'
channel.queue_declare(queue=dlx_queue)
channel.queue_bind(exchange=dlx_exchange, queue=dlx_queue, routing_key='delayed_key')
# 3. 普通队列,设置死信参数和队列TTL(也可针对单独消息设TTL)
args = {
'x-dead-letter-exchange': dlx_exchange,
'x-dead-letter-routing-key': 'delayed_key',
# 'x-message-ttl': 5000 # 统一队列TTL 5秒,这里用消息TTL演示
}
normal_queue = 'normal_queue'
channel.queue_declare(queue=normal_queue, arguments=args)
# 生产者发送一条TTL为5秒的消息
message = "Delayed order cancel"
properties = pika.BasicProperties(expiration='5000') # 消息TTL 5秒
channel.basic_publish(exchange='', routing_key=normal_queue, body=message, properties=properties)
print(f" [x] Sent to normal_queue with 5s TTL: {message}")
# 消费者监听死信队列(即延迟后的队列)
def consume_delayed(ch, method, properties, body):
print(f" [x] Received delayed message at {time.strftime('%X')}: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
import time
print(f" [*] Start time: {time.strftime('%X')}")
channel.basic_consume(queue=dlx_queue, on_message_callback=consume_delayed)
channel.start_consuming()
运行与输出
$ python delay_queue.py [x] Sent to normal_queue with 5s TTL: Delayed order cancel [*] Start time: 14:32:10 [x] Received delayed message at 14:32:15: Delayed order cancel
时间上正好相差5秒,延迟消费达成!这个模式广泛应用于定时触发、超时处理等业务。
10. 第八章:RPC —— 远程过程调用
RPC 允许客户端将请求消息发送到队列,然后阻塞等待服务器返回响应。实现的关键是 correlation_id(关联ID)和 reply_to(回调队列)。
服务器rpc_server.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def on_request(ch, method, props, body):
n = int(body)
print(f" [.] fib({n})")
# 模拟计算斐波那契
response = fib(n)
# 将结果发回给 props.reply_to 指定的回调队列
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
def fib(n):
if n == 0: return 0
elif n == 1: return 1
else: return fib(n-1) + fib(n-2)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
客户端rpc_client.py
import pika
import uuid
class FibonacciRpcClient:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
# 创建唯一的回调队列
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body.decode()
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
# 等待响应
while self.response is None:
self.connection.process_data_events()
return int(self.response)
# 使用客户端
fib_client = FibonacciRpcClient()
print(" [x] Requesting fib(10)")
response = fib_client.call(10)
print(f" [.] Got {response}")
运行与输出
服务器:
$ python rpc_server.py [x] Awaiting RPC requests [.] fib(10)
客户端:
$ python rpc_client.py [x] Requesting fib(10) [.] Got 55
客户端发送请求后,阻塞等待来自自己专属回调队列的响应,通过 correlation_id 精准匹配请求与响应。这是典型的消息同步模式。
11. 结语与最佳实践
我们由浅入深地走完了 RabbitMQ 在 Python 中六大消息模式与高级特性。这些代码片段不仅是示例,更是可以直接复用到生产项目中的模板。最后,总结几条金科玉律:
- 多用临时队列,善用交换机:消费者尽量使用随机队列并绑定到交换机,实现组件间解耦。
- 生产者确认 + 消费者手动确认 + 持久化:三者缺一,高可靠无从谈起。
auto_ack仅在可容忍消息丢失的场景使用。 - 合理设置
prefetch_count:避免一个消费者被堆积的消息撑死,是实现公平调度的利器。 - 死信队列不仅是延迟任务:它还适用于收集处理异常的消息,方便排查和人工干预。
- 连接与通道管理:
BlockingConnection适合简单脚本,异步场景请用AsyncioConnection或TornadoConnection。生产环境务必配置心跳和自动重连。 - 监控为王:善用
http://localhost:15672管理界面,观察队列长度、消息速率、消费者数量,是调优和排错的眼睛。
消息队列是分布式系统的脊梁,而 RabbitMQ 这条脊梁足够强壮且灵活。掌握它,你就掌握了一种构建健壮、可扩展系统的核心能力。
以上就是Python操作消息队列RabbitMQ的完整指南的详细内容,更多关于Python操作消息队列RabbitMQ的资料请关注脚本之家其它相关文章!
相关文章
macOS M1(AppleSilicon) 安装TensorFlow环境
苹果为M1芯片的Mac提供了TensorFlow的支持,本文主要介绍了如何给使用M1芯片的macOS安装TensorFlow的环境,感兴趣的可以了解一下2021-08-08
Python如何把Spark数据写入ElasticSearch
这篇文章主要介绍了Python如何把Spark数据写入ElasticSearch,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下2020-04-04


最新评论