Java连接Emqx实现订阅发布消息的步骤记录

 更新时间:2025年09月22日 10:34:31   作者:一杯冰美式_丶  
这篇文章主要介绍了Java连接Emqx实现订阅发布消息的步骤记录,EMQX是大规模分布式MQTT消息服务器,可以高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网与云应用,需要的朋友可以参考下

一:前提

安装了Emqx开源版、MQTTX客户端

二:订阅发布实现步骤

1.引入依赖

<!--MQTT客户端-->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

2.编辑配置文件

mqtt:
  broker:
    uri: tcp://127.0.0.1:31883
  client:
    id: mqtt-am-client-${random.uuid}
  # 订阅主题配置(支持多个)
  inTopics:
    - topic: test/topic1
      qos: 0
    - topic: test/topic2
      qos: 1
    - topic: test/topic3
      qos: 2
  # 发布主题配置(支持多个)
  outTopics:
    - topic: out/topic1
      qos: 0
  username: am
  password: LGyPtuAB4th5p
  keepAliveInterval: 60

3.读取配置文件

package com.wtzn.web.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {
    private Broker broker;
    private Client client;
    private List<TopicConfig> inTopics;
    private List<TopicConfig> outTopics;
    private String userName;
    private String password;
    private int KeepAliveInterval;

    @Data
    public static class Broker {
        private String uri;
    }

    @Data
    public static class Client {
        private String id;
    }
    @Data
    public static class TopicConfig {
        private String topic;
        private int qos;
    }

}

4.创建Mqtt客户端

package com.wtzn.web.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfig {

    @Autowired
    private MqttProperties mqttProperties;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        // 此客户端的用户名和密码
        options.setUserName(mqttProperties.getUserName());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setCleanSession(true);
        // 设置遗嘱消息
      //  options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getBytes(), 2, true);
        // 连接超时重试
        options.setConnectionTimeout(5000); //毫秒
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        options.setAutomaticReconnect(true);//网络中断重连
        client.connect(options);
        return client;
    }
}

5.controller层

package com.wtzn.web.controller;

import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.LinkedList;


@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttService mqttService;

    @SaIgnore
    @PostMapping("/mqtt")
    public void publish() {
        try {
          //  LinkedList<Payload> payloadLinkedList=new LinkedList<>();
            for(int i=1; i<=10000; i++){
                Payload payload=new Payload();
                payload.setTemperature(i);
              //  payloadLinkedList.add(payload);
                mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));
            }

        } catch (MqttException e) {
            log.error("发布消息失败{}", e.getMessage());
        }
        log.info("发布消息成功");
    }


}

6.service层

package com.wtzn.web.service;

import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Arrays;


@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {

    @Autowired
    private MqttClient mqttClient;

    @Autowired
    private MqttProperties mqttProperties;
    
    @PostConstruct
    public void init() throws MqttException {
        mqttClient.setCallback(this);
 /*       mqttClient.subscribe(mqttProperties.getInTopic());
        log.info("订阅主题{}", mqttProperties.getInTopic());
*/
        mqttProperties.getInTopics().forEach(x -> {
            try {
                mqttClient.subscribe(x.getTopic(), x.getQos());
                log.info("订阅主题{}", x.getTopic());
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
        });

    }

    @PreDestroy
    public void destroy() throws MqttException {
        mqttClient.disconnect();
        log.info("与服务器断开连接");
    }

    /**
     * @description: 发送消息
     * @param: [message]
     * @return: void
     **/
    public void publish(String topic,int qos,String message) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(qos);
        mqttClient.publish(topic, mqttMessage);
        log.info("向主题【{}】发布消息:【{}】", topic, message);
    }


    /**
     * @description: 接收消息
     * @param: [topic, message]
     * @return: void
     **/
    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
        Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);
        log.info("接收到来自【{}】的消息【{}】", topic, payload.getTemperature());
      /*  if (payload.getTemperature() > 37) {
            publish("发烧");
        }*/


    }


    @Override
    public void connectionLost(Throwable cause) {
        log.error("连接丢失:{}", cause.getMessage());
    }

    @SneakyThrows
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        if( token!=null ){
            MqttMessage message = null;
            try {
                message = token.getMessage();
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
            String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
            String str = message==null ? null : new String(message.getPayload());
            log.info("deliveryComplete: topic={}, message={}", topic, str);
        } else {
            log.info("deliveryComplete: null");
        }

        log.info("消息已送达");
    }

    @Override
    public void connectComplete(boolean b, String s) {

            mqttProperties.getInTopics().forEach(x -> {
                try {
                    mqttClient.subscribe(x.getTopic(), x.getQos());
                    log.info("订阅主题{}", x.getTopic());
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
            });
    }
}

7.dao层

package com.wtzn.web.domain.bo;

import lombok.Data;

@Data
public class Payload {
    private Integer temperature;
}

三:测试

1.PostMan直接调用测试

2、下载MQTTX客户端进行测试

总结 

到此这篇关于Java连接Emqx实现订阅发布消息的文章就介绍到这了,更多相关Java Emqx订阅发布消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 解析SpringBoot中@Autowire注解的实现原理

    解析SpringBoot中@Autowire注解的实现原理

    在开发Java项目时,依赖注入是一种常见的实现方式,SpringBoot框架通过@Autowired注解来实现依赖注入的功能,本文将介绍SpringBoot中 Autowired注解实现的原理
    2023-06-06
  • Eclipse中自动添加注释(两种)

    Eclipse中自动添加注释(两种)

    本文主要介绍了Eclipse中自动添加注释的两种方法。具有很好的参考价值,下面跟着小编一起来看下吧
    2017-02-02
  • idea打开项目后无法显示目录结构,只能显示.iml文件问题

    idea打开项目后无法显示目录结构,只能显示.iml文件问题

    这篇文章主要介绍了idea打开项目后无法显示目录结构,只能显示.iml文件问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • 这一次搞懂Spring代理创建及AOP链式调用过程操作

    这一次搞懂Spring代理创建及AOP链式调用过程操作

    这篇文章主要介绍了这一次搞懂Spring代理创建及AOP链式调用过程操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-08-08
  • 使用Java servlet实现自动登录退出功能

    使用Java servlet实现自动登录退出功能

    这篇文章主要介绍了使用Java servlet实现自动登录退出功能,,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-11-11
  • IDEA Project不显示/缺失文件问题及解决

    IDEA Project不显示/缺失文件问题及解决

    在侧边栏的project模式下,如果发现缺少部分文件,可以尝试关闭项目,打开项目所在目录,删除目录下的.idea文件夹,然后重新打开项目即可解决
    2024-11-11
  • MapStruct对象映射转换解决Bean属性拷贝性能问题

    MapStruct对象映射转换解决Bean属性拷贝性能问题

    无意间看到项目中有小伙伴用到了 MapStruct 来做对象映射转换当时我就很好奇,这个是什么框架,能够解决什么问题,带着这两个疑问就有了下面的文章
    2022-02-02
  • Java 精炼解读方法的定义与使用

    Java 精炼解读方法的定义与使用

    Java语言中的“方法”(Method)在其他语言当中也可能被称为“函数”(Function)。对于一些复杂的代码逻辑,如果希望重复使用这些代码,并且做到“随时任意使用”,那么就可以将这些代码放在一个大括号“{}”当中,并且起一个名字。使用的时候,直接找到名字调用即可
    2022-03-03
  • Springboot异常错误处理解决方案详解

    Springboot异常错误处理解决方案详解

    这篇文章主要介绍了Springboot异常错误处理解决方案详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08
  • JDK动态代理接口和接口实现类深入详解

    JDK动态代理接口和接口实现类深入详解

    这篇文章主要介绍了JDK动态代理接口和接口实现类,JDK动态代理是代理模式的一种实现方式,因为它是基于接口来做代理的,所以也常被称为接口代理,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-06-06

最新评论