Java连接Emqx实现订阅发布消息的步骤记录

 更新时间:2025年09月22日 10:34:31   作者:一杯冰美式_丶  
这篇文章主要介绍了Java连接Emqx实现订阅发布消息的步骤记录,EMQX是大规模分布式MQTT消息服务器,可以高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网与云应用,需要的朋友可以参考下

一:前提

安装了Emqx开源版、MQTTX客户端

二:订阅发布实现步骤

1.引入依赖

<!--MQTT客户端-->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

2.编辑配置文件

mqtt:
  broker:
    uri: tcp://127.0.0.1:31883
  client:
    id: mqtt-am-client-${random.uuid}
  # 订阅主题配置(支持多个)
  inTopics:
    - topic: test/topic1
      qos: 0
    - topic: test/topic2
      qos: 1
    - topic: test/topic3
      qos: 2
  # 发布主题配置(支持多个)
  outTopics:
    - topic: out/topic1
      qos: 0
  username: am
  password: LGyPtuAB4th5p
  keepAliveInterval: 60

3.读取配置文件

package com.wtzn.web.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {
    private Broker broker;
    private Client client;
    private List<TopicConfig> inTopics;
    private List<TopicConfig> outTopics;
    private String userName;
    private String password;
    private int KeepAliveInterval;

    @Data
    public static class Broker {
        private String uri;
    }

    @Data
    public static class Client {
        private String id;
    }
    @Data
    public static class TopicConfig {
        private String topic;
        private int qos;
    }

}

4.创建Mqtt客户端

package com.wtzn.web.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfig {

    @Autowired
    private MqttProperties mqttProperties;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        // 此客户端的用户名和密码
        options.setUserName(mqttProperties.getUserName());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setCleanSession(true);
        // 设置遗嘱消息
      //  options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getBytes(), 2, true);
        // 连接超时重试
        options.setConnectionTimeout(5000); //毫秒
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        options.setAutomaticReconnect(true);//网络中断重连
        client.connect(options);
        return client;
    }
}

5.controller层

package com.wtzn.web.controller;

import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.LinkedList;


@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttService mqttService;

    @SaIgnore
    @PostMapping("/mqtt")
    public void publish() {
        try {
          //  LinkedList<Payload> payloadLinkedList=new LinkedList<>();
            for(int i=1; i<=10000; i++){
                Payload payload=new Payload();
                payload.setTemperature(i);
              //  payloadLinkedList.add(payload);
                mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));
            }

        } catch (MqttException e) {
            log.error("发布消息失败{}", e.getMessage());
        }
        log.info("发布消息成功");
    }


}

6.service层

package com.wtzn.web.service;

import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Arrays;


@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {

    @Autowired
    private MqttClient mqttClient;

    @Autowired
    private MqttProperties mqttProperties;
    
    @PostConstruct
    public void init() throws MqttException {
        mqttClient.setCallback(this);
 /*       mqttClient.subscribe(mqttProperties.getInTopic());
        log.info("订阅主题{}", mqttProperties.getInTopic());
*/
        mqttProperties.getInTopics().forEach(x -> {
            try {
                mqttClient.subscribe(x.getTopic(), x.getQos());
                log.info("订阅主题{}", x.getTopic());
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
        });

    }

    @PreDestroy
    public void destroy() throws MqttException {
        mqttClient.disconnect();
        log.info("与服务器断开连接");
    }

    /**
     * @description: 发送消息
     * @param: [message]
     * @return: void
     **/
    public void publish(String topic,int qos,String message) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(qos);
        mqttClient.publish(topic, mqttMessage);
        log.info("向主题【{}】发布消息:【{}】", topic, message);
    }


    /**
     * @description: 接收消息
     * @param: [topic, message]
     * @return: void
     **/
    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
        Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);
        log.info("接收到来自【{}】的消息【{}】", topic, payload.getTemperature());
      /*  if (payload.getTemperature() > 37) {
            publish("发烧");
        }*/


    }


    @Override
    public void connectionLost(Throwable cause) {
        log.error("连接丢失:{}", cause.getMessage());
    }

    @SneakyThrows
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        if( token!=null ){
            MqttMessage message = null;
            try {
                message = token.getMessage();
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
            String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
            String str = message==null ? null : new String(message.getPayload());
            log.info("deliveryComplete: topic={}, message={}", topic, str);
        } else {
            log.info("deliveryComplete: null");
        }

        log.info("消息已送达");
    }

    @Override
    public void connectComplete(boolean b, String s) {

            mqttProperties.getInTopics().forEach(x -> {
                try {
                    mqttClient.subscribe(x.getTopic(), x.getQos());
                    log.info("订阅主题{}", x.getTopic());
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
            });
    }
}

7.dao层

package com.wtzn.web.domain.bo;

import lombok.Data;

@Data
public class Payload {
    private Integer temperature;
}

三:测试

1.PostMan直接调用测试

2、下载MQTTX客户端进行测试

总结 

到此这篇关于Java连接Emqx实现订阅发布消息的文章就介绍到这了,更多相关Java Emqx订阅发布消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java中的WeakHashMap详解

    Java中的WeakHashMap详解

    这篇文章主要介绍了Java中的WeakHashMap详解,WeakHashMap可能平时使用的频率并不高,但是你可能听过WeakHashMap会进行自动回收吧,下面就对其原理进行分析,需要的朋友可以参考下
    2023-09-09
  • Netty与NIO超详细讲解

    Netty与NIO超详细讲解

    Netty本质上是一个NIO的框架,适用于服务器通讯相关的多种应用场景。底层是NIO,NIO底层是Java IO和网络IO,再往下是TCP/IP协议,下面我们跟随文章来详细了解
    2022-08-08
  • 剑指Offer之Java算法习题精讲数组与字符和等差数列

    剑指Offer之Java算法习题精讲数组与字符和等差数列

    跟着思路走,之后从简单题入手,反复去看,做过之后可能会忘记,之后再做一次,记不住就反复做,反复寻求思路和规律,慢慢积累就会发现质的变化
    2022-03-03
  • SpringBoot项目启动打包报错类文件具有错误的版本 61.0, 应为 52.0的解决方法

    SpringBoot项目启动打包报错类文件具有错误的版本 61.0, 应为 52.0的解决

    这篇文章主要给大家介绍了关于SpringBoot项目启动打包报错类文件具有错误的版本 61.0, 应为 52.0的解决方法,文中有详细的排查过程和解决方法,通过代码介绍的非常详细,需要的朋友可以参考下
    2023-11-11
  • Spring Boot分段处理List集合多线程批量插入数据的解决方案

    Spring Boot分段处理List集合多线程批量插入数据的解决方案

    大数据量的List集合,需要把List集合中的数据批量插入数据库中,本文给大家介绍Spring Boot分段处理List集合多线程批量插入数据的解决方案,感兴趣的朋友跟随小编一起看看吧
    2024-04-04
  • Java线程状态变换过程代码解析

    Java线程状态变换过程代码解析

    这篇文章主要介绍了Java线程状态变换过程代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-06-06
  • Java动态线程池插件dynamic-tp集成过程浅析

    Java动态线程池插件dynamic-tp集成过程浅析

    这篇文章主要介绍了Java动态线程池插件dynamic-tp集成过程,dynamic-tp是一个轻量级的动态线程池插件,它是一个基于配置中心的动态线程池,线程池的参数可以通过配置中心配置进行动态的修改
    2023-03-03
  • Java项目中如何访问WEB-INF下jsp页面

    Java项目中如何访问WEB-INF下jsp页面

    这篇文章主要介绍了Java项目中如何访问WEB-INF下jsp页面,文章通过示例代码和图文解析介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • 手把手教你如何搭建SpringBoot+Vue前后端分离

    手把手教你如何搭建SpringBoot+Vue前后端分离

    这篇文章主要介绍了手把手教你如何搭建SpringBoot+Vue前后端分离,前后端分离是目前开发中常用的开发模式,达成充分解耦,需要的朋友可以参考下
    2023-03-03
  • Java中Cookie和Session的那些事儿

    Java中Cookie和Session的那些事儿

    Cookie和Session都是为了保持用户的访问状态,一方面为了方便业务实现,另一方面为了简化服务端的程序设计。这篇文章主要介绍了java中cookie和session的知识,需要的朋友可以参考下
    2016-09-09

最新评论