Springboot对接mqtt的项目实践

 更新时间:2026年02月10日 09:50:27   作者:IT界Tony哥  
在SpringBoot中对接MQTT协议,可以使用Eclipse Paho客户端和Spring Integration MQTT模块,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

在Spring Boot中对接MQTT协议,可以使用Eclipse Paho客户端和Spring Integration MQTT模块。以下是详细实现步骤:

1. 添加依赖

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <!-- Spring Integration MQTT -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    
    <!-- Eclipse Paho MQTT Client -->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
</dependencies>

2. 配置MQTT连接参数

# application.yml
mqtt:
  broker-url: tcp://localhost:1883
  username: admin
  password: password
  client-id: spring-boot-client
  default-topic: test/topic
  timeout: 30
  keepalive: 60
  completion-timeout: 30000

3. MQTT配置类

@Configuration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfig {

    @Autowired
    private MqttProperties mqttProperties;

    // MQTT连接配置
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setConnectionTimeout(mqttProperties.getTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepalive());
        options.setAutomaticReconnect(true);
        options.setCleanSession(true);
        return options;
    }

    // MQTT客户端工厂
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions());
        return factory;
    }

    // 出站消息通道(用于发送消息)
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = 
            new MqttPahoMessageHandler(mqttProperties.getClientId() + "-producer", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
        return messageHandler;
    }

    // 出站通道
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    // 入站消息适配器(用于接收消息)
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = 
            new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId() + "-consumer", 
                                                   mqttClientFactory(), mqttProperties.getDefaultTopic());
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    // 入站通道
    @Bean
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    // 入站消息处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
                String payload = (String) message.getPayload();
                System.out.println("Received message from topic: " + topic + ", payload: " + payload);
                // 处理接收到的消息
                processMessage(topic, payload);
            }
        };
    }
}

4. 配置属性类

@ConfigurationProperties(prefix = "mqtt")
@Component
@Data
public class MqttProperties {
    private String brokerUrl;
    private String username;
    private String password;
    private String clientId;
    private String defaultTopic;
    private int timeout;
    private int keepalive;
    private int completionTimeout;
}

5. MQTT服务类

@Service
public class MqttService {

    @Autowired
    private MessageChannel mqttOutboundChannel;

    // 发送消息到指定主题
    public void sendMessage(String topic, String message) {
        mqttOutboundChannel.send(MessageBuilder.withPayload(message)
                .setHeader("mqtt_topic", topic)
                .build());
    }

    // 发送消息到默认主题
    public void sendMessage(String message) {
        mqttOutboundChannel.send(MessageBuilder.withPayload(message).build());
    }

    // 发送带QoS的消息
    public void sendMessage(String topic, String message, int qos) {
        mqttOutboundChannel.send(MessageBuilder.withPayload(message)
                .setHeader("mqtt_topic", topic)
                .setHeader("mqtt_qos", qos)
                .build());
    }
}

6. 消息处理器

@Component
public class MqttMessageProcessor {

    private static final Logger logger = LoggerFactory.getLogger(MqttMessageProcessor.class);

    public void processMessage(String topic, String payload) {
        logger.info("Processing MQTT message - Topic: {}, Payload: {}", topic, payload);
        
        // 根据不同的主题进行不同的处理
        switch (topic) {
            case "test/topic":
                handleTestTopic(payload);
                break;
            case "sensor/data":
                handleSensorData(payload);
                break;
            default:
                handleDefaultMessage(topic, payload);
        }
    }

    private void handleTestTopic(String payload) {
        logger.info("处理测试主题消息: {}", payload);
        // 具体的业务逻辑
    }

    private void handleSensorData(String payload) {
        logger.info("处理传感器数据: {}", payload);
        try {
            // 解析JSON数据等操作
            // ObjectMapper mapper = new ObjectMapper();
            // SensorData data = mapper.readValue(payload, SensorData.class);
        } catch (Exception e) {
            logger.error("解析传感器数据失败", e);
        }
    }

    private void handleDefaultMessage(String topic, String payload) {
        logger.info("处理默认消息 - Topic: {}, Payload: {}", topic, payload);
    }
}

7. 控制器示例

@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttService mqttService;

    @PostMapping("/publish")
    public ResponseEntity<String> publishMessage(@RequestParam String topic, 
                                               @RequestParam String message) {
        try {
            mqttService.sendMessage(topic, message);
            return ResponseEntity.ok("Message published successfully");
        } catch (Exception e) {
            return ResponseEntity.status(500).body("Failed to publish message: " + e.getMessage());
        }
    }

    @PostMapping("/publish/default")
    public ResponseEntity<String> publishToDefaultTopic(@RequestParam String message) {
        try {
            mqttService.sendMessage(message);
            return ResponseEntity.ok("Message published to default topic");
        } catch (Exception e) {
            return ResponseEntity.status(500).body("Failed to publish message: " + e.getMessage());
        }
    }
}

8. 主应用类

@SpringBootApplication
@EnableConfigurationProperties
public class MqttApplication {
    public static void main(String[] args) {
        SpringApplication.run(MqttApplication.class, args);
    }
}

9. 测试MQTT服务

可以使用MQTT.fx或其他MQTT客户端工具进行测试:

  1. 启动Spring Boot应用
  2. 使用MQTT客户端订阅主题 test/topic
  3. 调用API发送消息:
curl -X POST "http://localhost:8080/mqtt/publish?topic=test/topic&message=Hello MQTT"

主要特性

  • 自动重连: 配置了自动重连机制
  • QoS支持: 支持不同的服务质量等级
  • 多主题订阅: 可以订阅多个主题
  • 异步处理: 消息发送支持异步模式
  • 配置灵活: 通过配置文件管理连接参数

这样你就实现了一个完整的Spring Boot MQTT集成方案,可以方便地进行消息的发布和订阅。

到此这篇关于Springboot对接mqtt的项目实践的文章就介绍到这了,更多相关Springboot对接mqtt内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • CentOS8.2安装Java 14.0.2的教程详解

    CentOS8.2安装Java 14.0.2的教程详解

    这篇文章主要介绍了CentOS8.2安装Java 14.0.2的详细教程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • 如何利用Java爬虫获取苏宁易购商品详情

    如何利用Java爬虫获取苏宁易购商品详情

    苏宁易购作为中国领先的电商平台之一,提供了丰富的商品信息,本文将介绍如何使用Java语言开发爬虫,获取苏宁易购商品的详细信息,感兴趣的朋友一起看看吧
    2024-12-12
  • Java开发中请求头的概念与写法代码示例

    Java开发中请求头的概念与写法代码示例

    本文介绍了Java开发中请求头(RequestHeaders)的用途、组成和常见字段,并提供了使用HttpURLConnection、HttpClient(Java11及以上)和ApacheHttpClient发送带有请求头的HTTP请求的代码示例,感兴趣的朋友跟随小编一起看看吧
    2025-12-12
  • 深入讲解java线程与synchronized关键字

    深入讲解java线程与synchronized关键字

    Java 中多线程的同步依靠的是对象锁机制,synchronized关键字就是利用了封装对象锁来实现对共享资源的互斥访问。下面这篇文章主要介绍了java线程与synchronized关键字的相关资料,需要的朋友可以参考下。
    2017-03-03
  • JavaWeb入门:ServletContext详解和应用

    JavaWeb入门:ServletContext详解和应用

    这篇文章主要介绍了Java ServletContext对象用法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2021-07-07
  • mybatis中BigDecimal中的0存为null的坑及解决

    mybatis中BigDecimal中的0存为null的坑及解决

    在使用MyBatis进行数据库操作时,若Java中属性类型为BigDecimal且值为0,插入数据库时可能会变为null,而不是0,这个问题可能是由于MyBatis在处理BigDecimal类型时的弱类型判断导致的,当BigDecimal变量与空字符串进行比较时,MyBatis可能将其视为null
    2024-10-10
  • 使用Java Minio搭建自己的文件系统详解

    使用Java Minio搭建自己的文件系统详解

    这篇文章主要介绍了使用Java Minio搭建自己的文件系统的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2021-09-09
  • Java启动命令大全(汇总)

    Java启动命令大全(汇总)

    Java启动命令是所有java应用程序的入口,通过它来启动Java运行时环境,并加载相关的class,本文希望做一个Java启动命令的汇总,和各位同道分享,也便于日后作为自己的参考
    2023-09-09
  • Java网络编程之简单的服务端客户端应用实例

    Java网络编程之简单的服务端客户端应用实例

    这篇文章主要介绍了Java网络编程之简单的服务端客户端应用,以实例形式较为详细的分析了java网络编程的原理与服务器端客户端的实现技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-04-04
  • java模拟cookie登陆操作

    java模拟cookie登陆操作

    这篇文章主要为大家详细介绍了java模拟cookie登陆操作,模拟登陆,取得cookie以记录身份,下次请求时发送cookie以表明身份,感兴趣的小伙伴们可以参考一下
    2016-07-07

最新评论