java代码mqtt接收发送消息方式

 更新时间:2023年09月28日 10:33:10   作者:其妙的太空人  
这篇文章主要介绍了java代码mqtt接收发送消息方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

java代码mqtt接收发送消息

mqtt消息第一用到不是太熟悉所以写一篇文章巩固一下。

前提是你已经把mqtt已经安装好,并且启动好了。

首先我们需要两部分代码。

所需依赖

         <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

连接mqtt部分的代码块,因为我不需要发送消息所以把发送消息给注释掉了。

package mqttclient.util;
import lombok.extern.slf4j.Slf4j;
import mqttclient.callback.MqttMessageCallback2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects;
@Component
@Slf4j
public class MqttClientUtil2 {
    private String username;
    private String password;
    @Value("tcp://127.0.0.1:1883")//这个是安装mqtt的ip以及端口,1883是mqtt默认端口
    private String host;
    @Value("CYT")//这个随便写但是是唯一的。
    private String clientId;
    @Value("cyt/#")这个是mqtt发送消息的咱们要订阅的topic,cyt/#代表以cyt/开始的所有topic都接收
    private String topic;
    @Value("${mqtt.connection.timeout}")//IOT_MQTT_Yield会block住timeout的时间去尝试接收数据,直到timeout才会退出。可以写在这里也可以写在yml配置文件中
    private int timeOut;
    @Value("${mqtt.keep.alive.interval}")
    private int interval;
    @Autowired
    private MqttMessageCallback2 mqttMessageCallback2;
    private MqttClient mqttClient;
    private MqttConnectOptions mqttConnectOptions;
    @PostConstruct
    private void init(){
        connect(host, clientId,topic);
    }
    /**
     * 链接mqtt
     * @param host
     * @param clientId
     */
    private void connect(String host,String clientId,String topic){
        try{
            mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
            mqttConnectOptions = getMqttConnectOptions();
            //设置回调函数
            mqttClient.setCallback(mqttMessageCallback2);
            //链接mqtt
            mqttClient.connect(mqttConnectOptions);
            //订阅消息
            mqttClient.subscribe(topic,2);
        }catch (Exception e){
            log.error("mqtt服务链接异常!");
            e.printStackTrace();
        }
    }
    /**
     * 设置链接对象信息
     * setCleanSession  true 断开链接即清楚会话  false 保留链接信息 离线还会继续发消息
     * @return
     */
    private MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        /*mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());*/
        mqttConnectOptions.setServerURIs(new String[]{host});
        mqttConnectOptions.setKeepAliveInterval(interval);
        mqttConnectOptions.setConnectionTimeout(timeOut);
        mqttConnectOptions.setCleanSession(true);
        return mqttConnectOptions;
    }
    /**
     *mqtt链接状态
     * @return
     */
    private boolean isConnect(){
        if(Objects.isNull(this.mqttClient)){
            return false;
        }
        return mqttClient.isConnected();
    }
    /**
     * 设置重连
     * @throws Exception
     */
    private void reConnect() throws Exception{
        if(Objects.nonNull(this.mqttClient)){
            log.info("mqtt 服务已重新链接...");
            this.mqttClient.connect(this.mqttConnectOptions);
        }
    }
    /**
     * 断开链接
     * @throws Exception
     */
    private void closeConnect() throws Exception{
        if(Objects.nonNull(this.mqttClient)){
            log.info("mqtt 服务已断开链接...");
            this.mqttClient.disconnect();
        }
    }
/*    *//**
     * 发布消息
     * @param topic
     * @param message
     * @param qos
     * @throws Exception
     *//*
    public void sendMessage(String topic,String message,int qos) throws Exception {
        if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(message.getBytes());
            mqttMessage.setQos(qos);
            MqttTopic mqttTopic = mqttClient.getTopic(topic);
            if(Objects.nonNull(mqttTopic)){
                try{
                    MqttDeliveryToken publish = mqttTopic.publish(mqttMessage);
                    if(publish.isComplete()){
                        log.info("消息发送成功---->{}",message);
                    }
                }catch(Exception e){
                    log.error("消息发送异常",e);
                }
            }
        }else{
            reConnect();
        }
    }*/
}

接收消息部分

package mqttclient.callback;
import lombok.extern.slf4j.Slf4j;
import mqttclient.util.ParsingData2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class MqttMessageCallback2 implements MqttCallback {
    /**
     * 链接丢失时处理
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        //可以做重连 或者 其他业务处理
    }
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		System.out.println("接收到消息topic---->{}"+topic);
		System.out.println("接收到消息topic---->{}"+mqttMessage);
        log.info("接收到消息质量qos---->{}",mqttMessage.getQos());
		System.out.println("接收到消息质量qos---->{}"+mqttMessage.getQos());
        log.info("接收到消息具体信息---->{}",new String(mqttMessage.getPayload()));
		System.out.println("接收到消息具体信息---->{}"+mqttMessage.getPayload());
        //结合业务 编写具体信息即可
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}

这个两个写完之后只要有数据发送过来,这边会自动进行接收打印。

是用mqtt网页版图形化界面进行模拟数据发送。

安装mqtt后打开此网站:http://localhost:18083/

默认账号是:admin / public

登录后这边可以设置中文:

模拟发送:这几个地方不用改动但是一定要点击绿色的连接才可以,进行发送。

需要修改的部分是:

然后点击发送就可以收到信息了。 

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • MyBatis缓存功能原理及实例解析

    MyBatis缓存功能原理及实例解析

    这篇文章主要介绍了MyBatis缓存功能原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • SpringSecurity在分布式环境下的使用流程分析

    SpringSecurity在分布式环境下的使用流程分析

    文章介绍了Spring Security在分布式环境下的使用,包括单点登录(SSO)的概念、流程图以及JWT(JSON Web Token)的生成和校验,通过使用JWT和RSA非对称加密,可以实现安全的分布式认证,感兴趣的朋友一起看看吧
    2025-02-02
  • spring boot集成rabbitmq的实例教程

    spring boot集成rabbitmq的实例教程

    这篇文章主要给大家介绍了关于spring boot集成rabbitmq的相关资料,springboot集成RabbitMQ非常简单,文中通过示例代码介绍的非常详细,需要的朋友们可以参考借鉴,下面随着小编来一起学习学习吧。
    2017-11-11
  • Spring Security在标准登录表单中添加一个额外的字段

    Spring Security在标准登录表单中添加一个额外的字段

    这篇文章主要介绍了Spring Security在标准登录表单中添加一个额外的字段,我们将重点关注两种不同的方法,以展示框架的多功能性以及我们可以使用它的灵活方式。 需要的朋友可以参考下
    2019-05-05
  • Spring/SpringBoot @RequestParam注解无法读取application/json格式数据问题解决

    Spring/SpringBoot @RequestParam注解无法读取application/json格式数据问题

    RequestParam用于将指定的请求参数赋值给方法中的形参,可以接受简单类型属性,也可以接受对象类型,一般用于GET请求,下面这篇文章主要给大家介绍了关于Spring/SpringBoot @RequestParam注解无法读取application/json格式数据问题解决的相关资料,需要的朋友可以参考下
    2022-10-10
  • SpringBoot集成JWT生成token及校验方法过程解析

    SpringBoot集成JWT生成token及校验方法过程解析

    这篇文章主要介绍了SpringBoot集成JWT生成token及校验方法过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • 列举java语言中反射的常用方法及实例代码

    列举java语言中反射的常用方法及实例代码

    反射机制指的是程序在运行时能够获取自身的信息。这篇文章主要介绍了列举java语言中反射的常用方法,需要的朋友可以参考下
    2019-07-07
  • Spring Boot Web应用开发 CORS 跨域请求支持

    Spring Boot Web应用开发 CORS 跨域请求支持

    本篇文章主要介绍了Spring Boot Web应用开发 CORS 跨域请求支持,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05
  • JAVA如何读取Excel数据

    JAVA如何读取Excel数据

    这篇文章主要介绍了JAVA如何读取Excel数据,帮助大家更好的理解和学习Java,感兴趣的朋友可以了解下
    2020-09-09
  • Java实现获取前、后N天日期的函数分享

    Java实现获取前、后N天日期的函数分享

    本文给大家分享的是使用java实现的获取当前日期前后N天的函数,非常的简单实用,有需要的小伙伴可以参考下。
    2015-03-03

最新评论