Springboot整合mqtt实现软硬件通信的示例代码

 更新时间:2025年12月26日 10:37:35   作者:程序陆  
本文介绍了如何使用Spring Boot整合MQTT协议,实现物联网软硬件通信, MQTT是一种轻量级的消息队列遥测传输协议,适用于物联网设备通信和远程监控,本文给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧

前言

本文实现Springboot整合mqtt,更好地实现物联网软硬件通信。

一、mqtt是什么?

MQTT 是消息队列遥测传输的缩写,是一种轻量级、基于发布 / 订阅模式的物联网通信协议。

应用场景:

  • 物联网设备通信,比如智能家居、工业传感器、穿戴设备。
  • 远程监控与数据采集,例如环境监测、设备状态上报。

注:这里实现的是软件后端层面的mqtt,是对硬件消息的接收和发送,如果要做一个完整的物联网项目,也需要硬件层面连接mqtt服务器,并且订阅、发布相关主题的信息。

二、物联网项目结构图

该图为完整的mqtt项目结构图。其中mqtt服务器部分可以用官方提供的公共的服务器,也可以在自己的服务器上搭建,只需要在后端配置文件添加mqtt服务器ip地址等相关信息。

注:Springboot项目不是自己内部开一个mqtt服务,所以我们只需要连接并使用相应的mqtt服务器即可。

三、实现代码

在pom.xml中引入Maven坐标:

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

在Springboot配置文件中配置mqtt服务器相关信息

spring:
  mqtt:
    url: tcp://broker.emqx.io:1883
    #用户名
    username: admin
    #密码
    password: 123456
    #客户端id(不能重复)
    client:
      id: consumer-id
    #MQTT默认的消息推送主题,实际可在调用接口时指定
    default:
      topic: topic

这里的mqtt服务器地址 tcp://broker.emqx.io:1883 是EMQX官方提供的一个公共的服务器,如果是想看一下初步效果,可以使用该服务器。

接下来我们可以放两部分代码,一部分就是接收消息,也就是订阅消息的部分,另一部分就是用来发布相关消息。

订阅消息部分:

@Configuration
public class MqttConsumerConfig {
    @Value("${spring.mqtt.username}")
    private String username;
    @Value("${spring.mqtt.password}")
    private String password;
    @Value("${spring.mqtt.url}")
    private String hostUrl;
    @Value("${spring.mqtt.client.id}")
    private String clientId;
    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;
    /**
     * 客户端对象
     */
    private MqttClient client;
    /**
     * 在bean初始化后连接到服务器
     */
    @PostConstruct
    public void init(){
        connect();
    }
    /**
     * 客户端连接服务端
     */
    public void connect(){
        try {
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
//            //设置连接用户名
//            options.setUserName(username);
//            //设置连接密码
//            options.setPassword(password.toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
            //设置回调
            client.setCallback(new MqttConsumerCallBack());
            client.connect(options);
            //订阅主题
            //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
            int[] qos = {1};
            System.out.println("连接");
            //主题
            //String[] topics = {"data","status"};
            client.subscribe(topics,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 断开连接
     */
    public void disConnect(){
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 订阅主题
     */
    public void subscribe(String topic,int qos){
        try {
            client.subscribe(topic,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
@Component
public class MqttConsumerCallBack implements MqttCallback{
    /**
     * 客户端断开连接的回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("与服务器断开连接,可重连");
    }
    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主题 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
        System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
        if(topic.equals("data")){
  System.out.println(String.format("接收消息retained : %b",message.isRetained()));
        }
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
    }
    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}

接下来是发布消息部分:

@Configuration
@Slf4j
public class MqttProviderConfig {
    @Value("${spring.mqtt.username}")
    private String username;
    @Value("${spring.mqtt.password}")
    private String password;
    @Value("${spring.mqtt.url}")
    private String hostUrl;
    private String clientId = "provider_id";
    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;
    /**
     * 客户端对象
     */
    private MqttClient client;
    /**
     * 在bean初始化后连接到服务器
     */
    @PostConstruct
    public void init(){
        connect();
    }
    /**
     * 客户端连接服务端
     */
    public void connect(){
        try{
           // System.out.println("Connecting to MQTT server: " + hostUrl + " with client ID: " + clientId);
            //创建MQTT客户端对象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(username);
            //设置连接密码
            options.setPassword(password.toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
            //设置回调
            client.setCallback(new MqttProviderCallBack());
            client.connect(options);
        } catch(MqttException e){
            e.printStackTrace();
        }
    }
    public void publish(int qos,boolean retained,String topic,String message){
        MqttMessage mqttMessage = new MqttMessage(); //创建消息实例
        mqttMessage.setQos(qos);  //设置qos
        mqttMessage.setRetained(retained);  //设置是否保留信息
        mqttMessage.setPayload(message.getBytes());
        //主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
@Configuration
public class MqttProviderCallBack implements MqttCallback{
    @Value("${spring.mqtt.client.id}")
    private String clientId;
    /**
     * 与服务器断开的回调
     */
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println(clientId+"与服务器断开连接");
    }
    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
    }
    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        IMqttAsyncClient client = token.getClient();
        System.out.println(client.getClientId()+"发布消息成功!");
    }
}

接下来就是调用publish方法,传入相应参数,即可完成消息的发布。

这里有几个参数要注意一下:

一、MQTT Topic(消息路由核心)

Topic 是客户端发布 / 订阅消息的 “地址标识”,是实现 “发布 - 订阅” 模式的基础。

  • 核心属性:字符串格式,无预定义结构,由客户端自定义,比如 “device/light/ 客厅”“data/sensor/ 温度”。
  • 层级与通配符:用斜杠 “/” 划分层级,支持两种通配符订阅 ——“+” 匹配单个层级(如 “device/+/ 客厅” 匹配 “device/light/ 客厅”),“#” 匹配当前及所有子层级(如 “data/#” 匹配所有数据类主题)。
  • 核心规则:发布者仅需指定 Topic 发送消息,订阅者通过匹配 Topic(或通配符)接收消息,发布者与订阅者无直接关联,实现解耦。

二、MQTT QoS(消息传输质量等级)

QoS(Quality of Service)定义了消息从发布者到订阅者的传输可靠性,MQTT 3.1.1 标准规定了 3 个等级,优先级从低到高。

  1. QoS 0(最多一次):消息仅发送一次,不确认、不重发,可能丢失。适用于对可靠性要求低的场景,如实时温度上报。
  2. QoS 1(至少一次):消息确保送达,但可能重复。发布者发送后等待确认,未收到确认则重发,直到订阅者确认接收。
  3. QoS 2(恰好一次):消息确保仅送达一次,无丢失、无重复。通过 “发布 - 确认 - 释放 - 完成” 四次握手实现,适用于金融交易、指令下发等关键场景。

三、MQTT Retained

Retained 消息是 Broker(服务器)为指定 Topic 保存的 “最新一条消息”,具备 “状态快照” 属性。

  • 发布时触发:客户端发布消息时,需显式设置 “Retain 标志位” 为 true,Broker 才会保存该消息。
  • 仅存最新:同一 Topic 后续发布的 Retained 消息会覆盖旧消息,Broker 始终只保留该 Topic 的最新状态。
  • 在有别的客户端订阅该Topic时,如果其“Retain 标志位” 为 true,一旦连接,客户端马上会收到一条保存的最后发布的该Topic的消息。

总结

本文详细描述了mqtt项目大致结构、Sringboot整合mqtt代码、mqtt的几个重要参数,希望对未来的架构师们有帮助,谢谢~

到此这篇关于Springboot整合mqtt实现软硬件通信的示例代码的文章就介绍到这了,更多相关Springboot整合mqtt通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • mybatis plus in方法使用详解

    mybatis plus in方法使用详解

    这篇文章主要介绍了mybatis plus in方法使用详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-04-04
  • 使用maven方式创建springboot项目的方式

    使用maven方式创建springboot项目的方式

    使用Spring Initializr创建spring boot项目,因为外网问题导致很难成功,所以只能使用maven方式,这里介绍下使用maven方式创建springboot项目的方法,感兴趣的朋友一起看看吧
    2022-09-09
  • Mybatis-plus如何通过反射实现动态排序不同字段功能

    Mybatis-plus如何通过反射实现动态排序不同字段功能

    这篇文章主要介绍了Mybatis-plus如何通过反射实现动态排序不同字段功能,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-02-02
  • Caused by: java.lang.ClassNotFoundException: org.apache.commons.collections.Transformer异常

    Caused by: java.lang.ClassNotFoundException: org.apache.comm

    这篇文章主要介绍了Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type异常,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • 深入浅出的讲解Java关键字final的作用

    深入浅出的讲解Java关键字final的作用

    final是Java中非常常见的一个关键字,可以说每天都在使用它,虽然常见,但却也不见得都那么显而易见,今天就来研究一下final,以加深对它的理解和更合理的运用,需要的朋友可以参考下
    2023-06-06
  • JavaWeb dbutils执行sql命令并遍历结果集时不能查到内容的原因分析

    JavaWeb dbutils执行sql命令并遍历结果集时不能查到内容的原因分析

    这篇文章主要介绍了JavaWeb dbutils执行sql命令并遍历结果集时不能查到内容的原因分析及简单处理方法,文中给大家介绍了javaweb中dbutils的使用,需要的朋友可以参考下
    2017-12-12
  • 一个JAVA小项目--Web应用自动生成Word

    一个JAVA小项目--Web应用自动生成Word

    前段时间接到一个Web应用自动生成Word的需求,现整理了下一些关键步骤拿来分享一下。
    2014-05-05
  • JVM 方法调用之静态分派(详解)

    JVM 方法调用之静态分派(详解)

    下面小编就为大家带来一篇JVM 方法调用之静态分派(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05
  • 使用springboot单例模式与线程安全问题踩的坑

    使用springboot单例模式与线程安全问题踩的坑

    这篇文章主要介绍了使用springboot单例模式与线程安全问题踩的坑,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • Java自学书籍Top 10

    Java自学书籍Top 10

    这篇文章主要为大家推荐了Java书籍Top 10,是由Java Inside推荐的十本不错的Java书籍,感兴趣的小伙伴们可以参考一下
    2016-09-09

最新评论