SpringBoot项目接入MQTT的详细指南
一、引言
MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,特别适用于物联网(IoT)场景,具有低带宽、高延迟网络环境下的优势。Spring Boot 作为流行的 Java 开发框架,能够方便地与 MQTT 集成,实现高效的消息通信。本文将详细介绍如何在 Spring Boot 项目中接入 MQTT。
二、环境准备
开发环境
- JDK 1.8 及以上版本
- Maven 3.x 或 Gradle
- Spring Boot 2.x 及以上版本
MQTT 服务器 可以选择使用公共的 MQTT 服务器,如 HiveMQ 公共服务器(
tcp://broker.hivemq.com:1883),也可以自行搭建 Mosquitto 等 MQTT 服务器。
三、创建 Spring Boot 项目
可以使用 Spring Initializr(start.spring.io/)快速创建一个 Spring Boot 项目,添加以下依赖:
- Spring Web
- Spring for Apache Pulsar(因为 Pulsar 也支持 MQTT 协议,同时这里我们会使用其相关的 MQTT 依赖)
如果使用 Maven,pom.xml 中添加如下依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
</dependencies>
解释
url:MQTT 服务器的地址和端口。client-id:客户端的唯一标识。default-topic:默认订阅的主题。username和password:如果 MQTT 服务器需要认证,则填写相应的用户名和密码。
五、创建 MQTT 配置类
创建一个配置类来配置 MQTT 连接和消息处理。
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttConfig {
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{"${spring.mqtt.url}"});
options.setUserName("${spring.mqtt.username}");
options.setPassword("${spring.mqtt.password}".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("${spring.mqtt.client-id}", mqttClientFactory(),
"${spring.mqtt.default-topic}");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
System.out.println("Received message: " + message.getPayload());
};
}
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("${spring.mqtt.client-id}-publisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("${spring.mqtt.default-topic}");
return messageHandler;
}
}
解释
mqttClientFactory:创建 MQTT 客户端工厂,配置连接选项。mqttInputChannel和mqttOutputChannel:定义消息通道,用于接收和发送消息。inbound:创建 MQTT 消息驱动的通道适配器,用于订阅主题并接收消息。handler:处理接收到的 MQTT 消息。mqttOutbound:创建 MQTT 消息处理程序,用于发布消息。
六、发送和接收 MQTT 消息
发送消息
创建一个服务类来发送 MQTT 消息。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
@Service
public class MqttMessageSender {
@Autowired
private MessageChannel mqttOutputChannel;
public void sendMessage(String message) {
mqttOutputChannel.send(new GenericMessage<>(message));
}
}
接收消息
在配置类中已经定义了消息处理逻辑,当接收到消息时,会在 handler 方法中进行处理。
七、测试 MQTT 连接
创建一个控制器来测试 MQTT 消息的发送。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MqttController {
@Autowired
private MqttMessageSender mqttMessageSender;
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
mqttMessageSender.sendMessage(message);
return "Message sent: " + message;
}
}
启动 Spring Boot 应用程序,访问 http://localhost:8080/send?message=Hello, MQTT! 即可发送 MQTT 消息。
八、总结
通过以上步骤,我们成功地在 Spring Boot 项目中接入了 MQTT,实现了消息的发送和接收。MQTT 作为一种轻量级的消息传输协议,与 Spring Boot 的集成可以帮助我们快速构建高效、稳定的物联网消息通信系统。在实际应用中,可以根据需求进一步扩展和优化,如增加消息持久化、多主题订阅等功能。
以上就是SpringBoot项目接入MQTT的详细指南的详细内容,更多关于SpringBoot接入MQTT的资料请关注脚本之家其它相关文章!
相关文章
spring-cloud入门之spring-cloud-config(配置中心)
这篇文章主要介绍了spring-cloud入门之spring-cloud-config(配置中心),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧2018-01-01
如何在Spring Boot微服务使用ValueOperations操作Redis集群String字符串
这篇文章主要介绍了在Spring Boot微服务使用ValueOperations操作Redis集群String字符串类型数据,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2023-06-06


最新评论