使用Python与MQTT实现异步通信功能

 更新时间:2024年12月19日 17:21:12   作者:小噔小咚什么东东  
物联网(IoT)和实时通信的世界中,消息队列遥测传输(MQTT)协议因其轻量级、可靠性和实时性成为广受欢迎的选择,本文给大家介绍了使用Python与MQTT实现异步通信功能,需要的朋友可以参考下

什么是MQTT协议?

MQTT是一种轻量级的发布/订阅消息传输协议,设计用于低带宽和高延迟的网络环境,非常适合物联网设备之间的通信。其主要特点包括:

  • 发布/订阅模型:支持多对多的消息传递。
  • 轻量级设计:较低的网络开销。
  • 支持QoS等级:提供不同的消息传递可靠性。

项目背景

本文的示例代码实现了一个基于Python的MQTT客户端。以下功能涵盖在代码中:

  • 通过SSL安全连接到MQTT代理。
  • 支持动态订阅多个主题。
  • 异步处理消息,提高性能和扩展性。
  • 提供自定义消息处理功能。

核心代码解析

以下是代码中的主要功能与模块解析:

MQTT 客户端类

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
        self.client = mqtt.Client()
        self.client.username_pw_set(self.username, self.password)
        self.client.tls_set(ca_certs=self.ca_cert)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

  • tls_set:启用SSL/TLS以确保通信安全。

  • 主题订阅:在连接成功时,自动订阅指定的主题。

自定义消息处理

def set_message_handler(self, handler):
    self.custom_message_handler = handler

用户可通过该方法传入自定义的回调函数,从而根据业务逻辑处理消息。

异步启动客户端

async def start_async(self):
    self.connect()
    await asyncio.get_event_loop().run_in_executor(None, self.client.loop_forever)

通过异步事件循环确保消息的高效处理,同时避免阻塞主线程。

示例代码集成

在主文件main.py中,定义了如下流程:

  • 初始化MQTT客户端并传入必要的参数。
  • 注册一个自定义的消息处理函数。
  • 利用asyncio实现消息处理和其他任务的并发执行。
async def on_mqtt_message(topic, payload):
    print(f"Custom handler: {topic} -> {payload}")

mqtt_client.set_message_handler(on_mqtt_message)
await mqtt_client.start_async()

使用指南

安装依赖

确保安装了paho-mqtt库:

pip install paho-mqtt

配置MQTT代理

更新代码中的代理地址、端口、用户名、密码和证书路径。

运行程序

使用以下命令运行程序:

python main.py

总结

快速搭建一个基于MQTT协议的实时通信系统。这种架构不仅适用于物联网场景,也可以在各种需要实时数据推送的应用中发挥作用,例如聊天应用和实时监控系统。

示例代码

mqtt.py

import paho.mqtt.client as mqtt
from datetime import datetime
import asyncio

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
        """
        初始化 MQTT 客户端
        """
        self.broker = broker
        self.port = port
        self.username = username
        self.password = password
        self.ca_cert = ca_cert
        self.topics = topics
        self.client = mqtt.Client()

        # 配置 MQTT 客户端
        self.client.username_pw_set(self.username, self.password)
        self.client.tls_set(ca_certs=self.ca_cert)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

        self.custom_message_handler = None  # 自定义消息处理器

    def set_message_handler(self, handler):
        """
        设置自定义消息处理回调函数
        """
        self.custom_message_handler = handler

    def on_connect(self, client, userdata, flags, rc):
        """
        连接成功时的回调
        """
        if rc == 0:
            print("SSL连接成功")
            for topic in self.topics:
                client.subscribe(topic)
                print(f"已订阅主题: {topic}")
        else:
            print(f"连接失败,返回码: {rc}")

    def on_message(self, client, userdata, msg):
        """
        收到消息时的回调
        """
        current_time = datetime.now()
        payload = msg.payload.decode()
        print(f"收到消息: {msg.topic} -> {payload} 时间: {current_time}")

        if self.custom_message_handler and self.event_loop:
            asyncio.run_coroutine_threadsafe(
                self.custom_message_handler(msg.topic, payload),
                self.event_loop
            )

    def connect(self):
        """
        连接到 MQTT 服务器
        """
        self.client.connect(self.broker, self.port, keepalive=60)

    async def start_async(self):
        """
        异步运行 MQTT 客户端
        """
        self.connect()  # 确保连接到 MQTT 服务器
        print("Starting MQTT client loop...")

        # 异步运行 MQTT 客户端的事件循环
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.client.loop_forever)

main.py

import asyncio
from mqtt import MQTTClient

# MQTT 配置
MQTT_BROKER = "你的服务器地址"
MQTT_PORT = 8883  # 使用 SSL 的端口
MQTT_USERNAME = "用户名"
MQTT_PASSWORD = "密码"
CA_CERT = "./emqxsl-ca.crt"  # CA 证书路径
TOPICS = ["clients/disconnect", "uhome/esp32"]  # 订阅的主题列表


async def main():
    loop = asyncio.get_running_loop()

    mqtt_client = MQTTClient(
        broker=MQTT_BROKER,
        port=MQTT_PORT,
        username=MQTT_USERNAME,
        password=MQTT_PASSWORD,
        ca_cert=CA_CERT,
        topics=TOPICS
    )

    async def on_mqtt_message(topic, payload):
        print(f"Custom handler: {topic} -> {payload}")

    mqtt_client.set_message_handler(on_mqtt_message)
    mqtt_client.event_loop = loop  # 将事件循环传递给 MQTT 客户端
    await mqtt_client.start_async()


    await asyncio.gather(websocket_task, periodic_task)


if __name__ == "__main__":
    asyncio.run(main())

到此这篇关于使用Python与MQTT实现异步通信功能的文章就介绍到这了,更多相关Python MQTT异步通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python抽样方法解读及实现过程

    python抽样方法解读及实现过程

    这篇文章主要介绍了python抽样方法解读及实现过程讲解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • qpython3 读取安卓lastpass Cookies

    qpython3 读取安卓lastpass Cookies

    这篇文章主要介绍了qpython3 读取安卓lastpass Cookies的相关资料,需要的朋友可以参考下
    2016-06-06
  • 详解Python中__new__和__init__的区别与联系

    详解Python中__new__和__init__的区别与联系

    在Python中,每个对象都有两个特殊的方法:__new__和__init__,本文将详细介绍这两个方法的不同之处以及它们之间的联系,具有一定的参考价值,感兴趣的可以了解一下
    2023-12-12
  • Python中的Django框架基础及安装教程

    Python中的Django框架基础及安装教程

    本文介绍了Django框架的基本概念、安装、项目创建和启动方法,以及路由配置、ORM思想和使用,Django是一个全栈框架,遵循MTV模式,提供自动化工具和ORM,简化了Web开发流程,感兴趣的朋友跟随小编一起看看吧
    2025-11-11
  • python多进程和多线程介绍

    python多进程和多线程介绍

    这篇文章主要介绍了python多进程和多线程,进程是分配资源的最小单位,线程是系统调度的最小单位,下文更多相关资料介绍,需要的小伙伴可以参考一下
    2022-04-04
  • Django Rest Framework实现身份认证源码详解

    Django Rest Framework实现身份认证源码详解

    这篇文章主要为大家介绍了Django Rest Framework实现身份认证源码详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • Python数据分析之绘制ppi-cpi剪刀差图形

    Python数据分析之绘制ppi-cpi剪刀差图形

    这篇文章主要介绍了Python数据分析之绘制ppi-cpi剪刀差图形,ppi-cp剪刀差是通过这个指标可以了解当前的经济运行状况,下文更多详细内容介绍需要的小伙伴可以参考一下
    2022-05-05
  • 5道关于python基础 while循环练习题

    5道关于python基础 while循环练习题

    这篇文章主要给大家分享的是5道关于python基础 while循环练习题,无论学习什么语言,练习都是必不可少的,下面文章的练习题挺精湛的,需要的朋友可以参考一下
    2021-11-11
  • 在Linux中通过Python脚本访问mdb数据库的方法

    在Linux中通过Python脚本访问mdb数据库的方法

    这篇文章主要介绍了在Linux中通过Python脚本访问mdb数据库的方法,本文示例基于debian系的Linux系统,需要的朋友可以参考下
    2015-05-05
  • 浅析Python中将单词首字母大写的capitalize()方法

    浅析Python中将单词首字母大写的capitalize()方法

    这篇文章主要介绍了浅析Python中将单词首字母大写的capitalize()方法,是Python入门中的基础知识,需要的朋友可以参考下
    2015-05-05

最新评论