Python操作消息队列RabbitMQ的完整指南

 更新时间:2026年05月13日 08:43:18   作者:IT策士  
在现代分布式系统中,消息队列如同人体的神经系统,负责在各个服务之间可靠,高效地传递信息,本文介绍了使用Python操作RabbitMQ消息队列的基础概念和高级应用场景,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下

在现代分布式系统中,消息队列如同人体的神经系统,负责在各个服务之间可靠、高效地传递信息。RabbitMQ 作为 AMQP(高级消息队列协议)的标杆实现,凭借其灵活的路由、可靠的消息投递和丰富的生态,成为了架构师工具箱中的必备品。本文作为 Python 中间件系列 的开篇,将带你从零开始,用 Python 操作 RabbitMQ,从 “Hello World” 直连模式讲到延迟队列、RPC 等高级场景,所有示例均附带可直接运行的代码和详尽的控制台输出解析。

1. 核心概念速览

在敲代码之前,先在大脑中建立一张地图。RabbitMQ 的核心由三个角色和两个关键组件构成:

  • 生产者 (Producer):发送消息的应用程序。
  • 消费者 (Consumer):接收并处理消息的应用程序。
  • 队列 (Queue):RabbitMQ 内部的缓冲区,用于存储消息。消息一旦进入队列,就由 RabbitMQ 负责保管,直到消费者取走。
  • 交换机 (Exchange):生产者不会直接把消息发到队列,而是发给交换机。交换机根据规则,决定消息路由到一个或多个队列。主要有四种类型:directfanouttopicheaders
  • 绑定 (Binding):队列和交换机之间的“连线”,在绑定时会指定路由键 (Routing Key)。交换机根据路由键和自身类型,决定把消息投递到哪些绑定的队列。

一句话总结:生产者 -> 交换机 -(通过绑定和路由键)-> 队列 <- 消费者

理解了这张图,后面的代码就只是把它翻译成 Python 语言而已。

2. 环境准备

我们使用 Docker 快速启动 RabbitMQ,并安装 Python 客户端 pika

启动 RabbitMQ(带管理界面的版本):

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

启动后,可通过 http://localhost:15672 访问管理界面,默认账号密码 guest/guest

安装 pika 库:

接下来,所有代码示例都将围绕这两个环境展开。

3. 第一章:Hello World —— 最简单的消息模型

我们先从一个最简单的 单生产者 -> 单消费者 模型开始,发送一句 “Hello World”。

生产者send.py

import pika

# 1. 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2. 声明队列(如果队列不存在则创建,存在则复用)
channel.queue_declare(queue='hello')

# 3. 发布消息
channel.basic_publish(exchange='',      # 使用默认交换机
                      routing_key='hello', # 消息直接发到 'hello' 队列
                      body='Hello World!')

print(" [x] Sent 'Hello World!'")

# 4. 关闭连接
connection.close()

消费者receive.py

import pika

# 1. 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2. 声明队列(幂等操作,确保队列存在)
channel.queue_declare(queue='hello')

# 3. 定义回调函数
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")

# 4. 订阅队列
channel.basic_consume(queue='hello',
                      auto_ack=True,  # 自动确认(先这样,后面会讲)
                      on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

运行与输出解析

打开两个终端:

终端 1(消费者):

$ python receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received Hello World!

终端 2(生产者):

$ python send.py
 [x] Sent 'Hello World!'

这里的关键点:

  • 生产者使用默认交换机(空字符串 ''),此时 routing_key 就是目标队列名。
  • 消费者用 basic_consume 持续监听队列,auto_ack=True 表示消息一旦送出就自动确认删除。

4. 第二章:工作队列 —— 任务的分发与负载均衡

真实世界中,单个消费者往往忙不过来。工作队列允许多个消费者从同一个队列中取任务,消息只被一个消费者处理

循环分发(Round-robin)

默认情况下,RabbitMQ 会把消息轮流发给所有消费者。我们创建持续发送任务的 new_task.py 和多个 worker.py

生产者 new_task.py

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')

for i in range(1, 6):
    message = f"Task {i}"
    channel.basic_publish(exchange='', routing_key='task_queue', body=message)
    print(f" [x] Sent '{message}'")
    time.sleep(0.5)

connection.close()

消费者 worker.py

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')

def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")
    # 模拟处理耗时(偶数任务耗时短,奇数耗时长)
    time.sleep(2 if int(body.decode().split()[1]) % 2 != 0 else 0.5)
    print(f" [x] Done {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认

# 重要:每次只分发一条消息,消费者处理完并确认后才发下一条
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print(' [*] Waiting for messages...')
channel.start_consuming()

运行与公平分发演示

打开三个终端:一个生产,两个消费。

终端 1(Worker A):

$ python worker.py
 [*] Waiting for messages...
 [x] Received Task 1     # 耗时2秒
 [x] Done Task 1
 [x] Received Task 3     # 耗时2秒
 [x] Done Task 3
 [x] Received Task 5     # 耗时2秒

终端 2(Worker B):

$ python worker.py
 [*] Waiting for messages...
 [x] Received Task 2     # 耗时0.5秒,先完成
 [x] Done Task 2
 [x] Received Task 4     # 耗时0.5秒
 [x] Done Task 4

终端 3(生产者):

$ python new_task.py
 [x] Sent 'Task 1'
 [x] Sent 'Task 2'
 [x] Sent 'Task 3'
 [x] Sent 'Task 4'
 [x] Sent 'Task 5'

输出解读:

  • 我们没有使用自动确认 auto_ack,而是手动 basic_ack。这保证了如果 Worker A 在处理 Task 1 时崩溃,该消息会重新入队并分发给 Worker B。
  • basic_qos(prefetch_count=1) 是关键。它告诉 RabbitMQ:不要同时给我超过 1 条消息。这样 Worker A 在处理 Task 1 时,尽管 Task 2、3 已经入队,但 Task 3 不会预发给 A,而是会发给空闲的 Worker B。这就实现了公平分发,处理速度快的消费者将拿到更多任务。

5. 第三章:发布/订阅 —— 用 Fanout 交换机广播消息

现在,我们需要让多个消费者都能收到同一条消息,就像日志广播。这需要引入交换机fanout 交换机将消息广播到所有绑定的队列

生产者emit_log.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = "info: Hello Fanout!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")

connection.close()

消费者receive_logs.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 让 RabbitMQ 生成一个唯一的、临时队列,消费者断开后自动删除
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 绑定队列到交换机
channel.queue_bind(exchange='logs', queue=queue_name)

print(f' [*] Waiting for logs on queue: {queue_name}')

def callback(ch, method, properties, body):
    print(f" [x] {body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

运行演示:一发多收

同时启动两个 receive_logs.py,再执行 emit_log.py

消费者终端 1:

$ python receive_logs.py
 [*] Waiting for logs on queue: amq.gen-JzTY20BRgKO-HjmUJj0wLg
 [x] info: Hello Fanout!

消费者终端 2:

$ python receive_logs.py
 [*] Waiting for logs on queue: amq.gen-0cHw5VhC7KnpjRzAsj0Xww
 [x] info: Hello Fanout!

生产者终端:

$ python emit_log.py
 [x] Sent info: Hello Fanout!

可见,两个消费者都收到了相同的消息。关键在于 queue_declare(queue='', exclusive=True):每个消费者启动时都会创建一个随机的独占临时队列,并绑定到 logs 交换机。生产者完全不需要关心消费者的数量和地址,达到了彻底的解耦。

6. 第四章:路由模式 —— 用 Direct 交换机精准投递

Fanout 是“无脑广播”,而 direct 交换机根据完全匹配的路由键,将消息投递给绑定键相同的队列。适用于按日志级别(error、info)分发。

生产者emit_direct_log.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

print(f" [x] Sent {severity}:{message}")
connection.close()

消费者receive_direct_log.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 通过命令行参数指定绑定的路由键,如 python receive_direct_log.py error info
severities = sys.argv[1:] if len(sys.argv) > 1 else ['info']
for severity in severities:
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print(f' [*] Waiting for {severities} logs. Queue: {queue_name}')

def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

运行演示

终端 1(只收 error):

$ python receive_direct_log.py error
 [*] Waiting for ['error'] logs. Queue: amq.gen-XXX

终端 2(收 error 和 info):

$ python receive_direct_log.py error info
 [*] Waiting for ['error', 'info'] logs. Queue: amq.gen-YYY

生产者发送消息:

$ python emit_direct_log.py error "Disk full"
 [x] Sent error:Disk full
$ python emit_direct_log.py info "Server started"
 [x] Sent info:Server started

输出:终端1只收到 error 消息,终端2则两条都收到。 这种精确匹配非常适合按模块、优先级区分处理。

7. 第五章:主题模式 —— 用 Topic 交换机实现灵活匹配

topic 交换机是 direct 的升级版。路由键是由点分隔的单词列表(如 “weather.us.east”),绑定键可以使用通配符:

  • * 匹配刚好一个单词。
  • # 匹配零个或多个单词。

生产者emit_topic.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()

消费者receive_topic.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

# 绑定键从命令行接收,例如: "kern.*" 或 "*.critical" 或 "#"
binding_keys = sys.argv[1:] if len(sys.argv) > 1 else ['anonymous.*']
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(f' [*] Waiting for logs. Binding keys: {binding_keys}. Queue: {queue_name}')

def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

运行演示

消费者 1:订阅所有 kern 开头的日志

$ python receive_topic.py "kern.*"
 [*] Waiting for logs. Binding: ['kern.*']

消费者 2:订阅所有 critical 结尾的日志

$ python receive_topic.py "*.critical"
 [*] Waiting for logs. Binding: ['*.critical']

消费者 3:接收所有日志(#

$ python receive_topic.py "#"
 [*] Waiting for logs. Binding: ['#']
**生产者:**
```bash
$ python emit_topic.py kern.critical "Kernel panic"
 [x] Sent kern.critical:Kernel panic
$ python emit_topic.py app.critical "App crash"
 [x] Sent app.critical:App crash
$ python emit_topic.py kern.info "Kernel info"
 [x] Sent kern.info:Kernel info

结果:

  • kern.critical 会被三个消费者都收到(匹配 kern.*, *.critical, #)。
  • app.critical 仅被消费者2和3收到。
  • kern.info 仅被消费者1和3收到。

Topic 交换机提供了极其强大的基于模式的消息路由,是构建事件驱动架构的利器。

8. 第六章:消息可靠性 —— 确认、持久化与发布者确认

生产环境中,消息不能丢。RabbitMQ 提供了三重保障:

  • 消费者确认(Acknowledgement):消费者告诉 RabbitMQ,消息已成功处理。我们已在工作队列中使用 basic_ack
  • 队列持久化(Durable):RabbitMQ 重启后队列不丢失。queue_declare(queue='task_queue', durable=True)
  • 消息持久化:消息本身标记为持久,写入磁盘。properties=pika.BasicProperties(delivery_mode=2)
  • 发布者确认(Publisher Confirms):生产者知道消息是否成功到达 RabbitMQ。

我们来改造工作队列,加入发布者确认和持久化。

可靠生产者reliable_send.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 启用发布者确认
channel.confirm_delivery()

# 声明持久化队列
channel.queue_declare(queue='durable_task', durable=True)

for i in range(1, 4):
    message = f"Persistent Task {i}"
    # 将消息标记为持久化
    properties = pika.BasicProperties(delivery_mode=2)
    try:
        channel.basic_publish(exchange='',
                              routing_key='durable_task',
                              body=message,
                              properties=properties)
        print(f" [x] Confirmed: {message}")
    except pika.exceptions.UnroutableError:
        print(f" [x] Failed to route: {message}")

connection.close()

可靠消费者reliable_worker.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='durable_task', durable=True)

def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")
    # 模拟处理
    import time
    time.sleep(1)
    print(f" [x] Done {body.decode()}")
    # 手动确认
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='durable_task', on_message_callback=callback)

print(' [*] Waiting for durable tasks...')
channel.start_consuming()

运行演示

先启动 reliable_worker.py,再执行 reliable_send.py

生产者输出:

$ python reliable_send.py
 [x] Confirmed: Persistent Task 1
 [x] Confirmed: Persistent Task 2
 [x] Confirmed: Persistent Task 3

现在即使重启 RabbitMQ(docker restart rabbitmq),队列和尚未被消费的持久化消息也不会丢失。

提示:持久化有性能开销,只对关键消息使用。

9. 第七章:高级应用 —— 死信队列与延迟队列

如何实现“订单30分钟未支付自动取消”这样的延迟任务?RabbitMQ 本身没有延迟队列,但可以用 死信交换机(DLX)消息TTL(存活时间) 组合实现。

原理

  • 给队列设置 x-dead-letter-exchangex-dead-letter-routing-key。当消息在该队列中变成死信(被拒绝、过期或队列满)时,会被自动转发到死信交换机。
  • 给消息设置 expiration (毫秒)。消息过期后会变成死信,从而被投递到指定的延迟队列。

延迟队列代码delay_queue.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. 定义死信交换机
dlx_exchange = 'dlx_exchange'
channel.exchange_declare(exchange=dlx_exchange, exchange_type='direct')

# 2. 死信队列(真正延迟后消费的队列)
dlx_queue = 'delayed_queue'
channel.queue_declare(queue=dlx_queue)
channel.queue_bind(exchange=dlx_exchange, queue=dlx_queue, routing_key='delayed_key')

# 3. 普通队列,设置死信参数和队列TTL(也可针对单独消息设TTL)
args = {
    'x-dead-letter-exchange': dlx_exchange,
    'x-dead-letter-routing-key': 'delayed_key',
    # 'x-message-ttl': 5000  # 统一队列TTL 5秒,这里用消息TTL演示
}
normal_queue = 'normal_queue'
channel.queue_declare(queue=normal_queue, arguments=args)

# 生产者发送一条TTL为5秒的消息
message = "Delayed order cancel"
properties = pika.BasicProperties(expiration='5000')  # 消息TTL 5秒
channel.basic_publish(exchange='', routing_key=normal_queue, body=message, properties=properties)
print(f" [x] Sent to normal_queue with 5s TTL: {message}")

# 消费者监听死信队列(即延迟后的队列)
def consume_delayed(ch, method, properties, body):
    print(f" [x] Received delayed message at {time.strftime('%X')}: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

import time
print(f" [*] Start time: {time.strftime('%X')}")
channel.basic_consume(queue=dlx_queue, on_message_callback=consume_delayed)
channel.start_consuming()

运行与输出

$ python delay_queue.py
 [x] Sent to normal_queue with 5s TTL: Delayed order cancel
 [*] Start time: 14:32:10
 [x] Received delayed message at 14:32:15: Delayed order cancel

时间上正好相差5秒,延迟消费达成!这个模式广泛应用于定时触发、超时处理等业务。

10. 第八章:RPC —— 远程过程调用

RPC 允许客户端将请求消息发送到队列,然后阻塞等待服务器返回响应。实现的关键是 correlation_id(关联ID)和 reply_to(回调队列)。

服务器rpc_server.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')

def on_request(ch, method, props, body):
    n = int(body)
    print(f" [.] fib({n})")
    # 模拟计算斐波那契
    response = fib(n)

    # 将结果发回给 props.reply_to 指定的回调队列
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

def fib(n):
    if n == 0: return 0
    elif n == 1: return 1
    else: return fib(n-1) + fib(n-2)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

客户端rpc_client.py

import pika
import uuid

class FibonacciRpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()

        # 创建唯一的回调队列
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(queue=self.callback_queue,
                                   on_message_callback=self.on_response,
                                   auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body.decode()

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        # 等待响应
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

# 使用客户端
fib_client = FibonacciRpcClient()
print(" [x] Requesting fib(10)")
response = fib_client.call(10)
print(f" [.] Got {response}")

运行与输出

服务器:

$ python rpc_server.py
 [x] Awaiting RPC requests
 [.] fib(10)

客户端:

$ python rpc_client.py
 [x] Requesting fib(10)
 [.] Got 55

客户端发送请求后,阻塞等待来自自己专属回调队列的响应,通过 correlation_id 精准匹配请求与响应。这是典型的消息同步模式。

11. 结语与最佳实践

我们由浅入深地走完了 RabbitMQ 在 Python 中六大消息模式与高级特性。这些代码片段不仅是示例,更是可以直接复用到生产项目中的模板。最后,总结几条金科玉律:

  • 多用临时队列,善用交换机:消费者尽量使用随机队列并绑定到交换机,实现组件间解耦。
  • 生产者确认 + 消费者手动确认 + 持久化:三者缺一,高可靠无从谈起。auto_ack 仅在可容忍消息丢失的场景使用。
  • 合理设置 prefetch_count:避免一个消费者被堆积的消息撑死,是实现公平调度的利器。
  • 死信队列不仅是延迟任务:它还适用于收集处理异常的消息,方便排查和人工干预。
  • 连接与通道管理BlockingConnection 适合简单脚本,异步场景请用 AsyncioConnectionTornadoConnection。生产环境务必配置心跳自动重连
  • 监控为王:善用 http://localhost:15672 管理界面,观察队列长度、消息速率、消费者数量,是调优和排错的眼睛。

消息队列是分布式系统的脊梁,而 RabbitMQ 这条脊梁足够强壮且灵活。掌握它,你就掌握了一种构建健壮、可扩展系统的核心能力。

以上就是Python操作消息队列RabbitMQ的完整指南的详细内容,更多关于Python操作消息队列RabbitMQ的资料请关注脚本之家其它相关文章!

相关文章

  • python MysqlDb模块安装及其使用详解

    python MysqlDb模块安装及其使用详解

    本篇文章主要介绍了python MysqlDb模块安装及其使用详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-02-02
  • macOS M1(AppleSilicon) 安装TensorFlow环境

    macOS M1(AppleSilicon) 安装TensorFlow环境

    苹果为M1芯片的Mac提供了TensorFlow的支持,本文主要介绍了如何给使用M1芯片的macOS安装TensorFlow的环境,感兴趣的可以了解一下
    2021-08-08
  • Python网页解析器使用实例详解

    Python网页解析器使用实例详解

    这篇文章主要介绍了Python网页解析器使用实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • Python unittest 自动识别并执行测试用例方式

    Python unittest 自动识别并执行测试用例方式

    这篇文章主要介绍了Python unittest 自动识别并执行测试用例方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-03-03
  • Python矩阵常见运算操作实例总结

    Python矩阵常见运算操作实例总结

    这篇文章主要介绍了Python矩阵常见运算操作,结合实例形式总结分析了Python矩阵的创建以及相乘、求逆、转置等相关操作实现方法,需要的朋友可以参考下
    2017-09-09
  • python实现漫天飘落的七彩花朵效果

    python实现漫天飘落的七彩花朵效果

    要实现漫天飘落的七彩花朵效果,你可以使用Python的图形库,如Pygame或Pyglet,这些库可以帮助你创建动画和图形效果,本文给大家介绍了如何使用python实现漫天飘落的七彩花朵效果,感兴趣的朋友可以参考下
    2024-01-01
  • Python被远程主机强制关闭后自动重新运行进程的示例

    Python被远程主机强制关闭后自动重新运行进程的示例

    要实现Python程序在被远程主机强制关闭后能够自动重新运行,我们可以采用几种方法,但最直接且常用的方法之一是结合操作系统级的工具或脚本,这篇文章主要介绍了Python被远程主机强制关闭后怎么自动重新运行进程,需要的朋友可以参考下
    2024-08-08
  • Python如何统计函数调用的耗时

    Python如何统计函数调用的耗时

    这篇文章主要为大家详细介绍了如何使用Python实现统计函数调用的耗时,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2024-04-04
  • Python中的基本数据类型讲解

    Python中的基本数据类型讲解

    这篇文章介绍了Python中的基本数据类型,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-05-05
  • Python如何把Spark数据写入ElasticSearch

    Python如何把Spark数据写入ElasticSearch

    这篇文章主要介绍了Python如何把Spark数据写入ElasticSearch,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04

最新评论