SpringBoot+MQTT+apollo实现订阅发布功能的示例

 更新时间:2020年06月16日 14:53:03   作者:来串糖葫芦  
这篇文章主要介绍了SpringBoot+MQTT+apollo实现订阅发布功能的示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

由于最近公司在开发一款后台与安卓的更新系统,经过再三研究之后,也是选择Mqtt这个目前流行的框架。为了能够让项目运营起来,最终虽说是选择ActiveMQ。但在这个过程中,也是发现Apollo作为服务器也是相当不错。当然对于后者已经被apace放弃,不过今天还是和大家整理一下SpringBoot+MQTT+apollo实现订阅发布功能的全过程。

对于项目首先需要用到的前提东西,比如Apollo如何下载,以及MQTT测试工具在这里就不多说。如果真的不懂私聊Damon吧,在这里就不浪费时间。

对于项目,首先你所需要引入maven包:

pom.xml

  <!-- MQTT -->
  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
  </dependency>

其目标就是将MQTT用在项目组中
接着就是项目yml文件的配置,使用properties,以葫芦画瓢就行了:

applicaiton.yml

mqtt:
 username: admin
 password: password
 host-url: tcp://127.0.0.1:8161 # 你自己服务器的地址和端口,这个需要改
 clientID: test1    # 这个改不改随意,但不同的客户端肯定不能一样
 default-topic: home/garden/fountain   # 默认主题
 timeout: 100
 keepalive: 100

# Tomcat
server:
 tomcat:
  uri-encoding: UTF-8
  max-threads: 1000
  min-spare-threads: 30
 port: 8088

注意host-url,这就是你apollo的地址

来到第三步,此时就是项目内的文件:

MqttConfig文件

@Component
@ConfigurationProperties("mqtt")
@Setter
@Getter
public class MqttConfig {
  @Autowired
  private MqttPushClient mqttPushClient;

  /**
   * 用户名
   */
  // @Value("username")
  private String username;
  /**
   * 密码
   */
  private String password;
  /**
   * 连接地址
   */
  private String hostUrl;
  /**
   * 客户Id
   */
  private String clientID;
  /**
   * 默认连接话题
   */
  private String defaultTopic;
  /**
   * 超时时间
   */
  private int timeout;
  /**
   * 保持连接数
   */
  private int keepalive;

  @Bean
  public MqttPushClient getMqttPushClient() {
    System.out.println("hostUrl: "+ hostUrl);
    System.out.println("clientID: "+ clientID);
    System.out.println("username: "+ username);
    System.out.println("password: "+ password);
    System.out.println("timeout: "+timeout);
    System.out.println("keepalive: "+ keepalive);
    mqttPushClient.connect(hostUrl, clientID, username, password, timeout, keepalive);
    // 以/#结尾表示订阅所有以test开头的主题
    mqttPushClient.subscribe(defaultTopic, 0);
    return mqttPushClient;
  }
}

目的就是配置所对应的消息

第四步就是发布以及订阅等功能:

MqttPushClient

@Component
public class MqttPushClient {
  private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

  @Autowired
  private PushCallback pushCallback;

  private static MqttClient client;

  private static MqttClient getClient() {
    return client;
  }

  private static void setClient(MqttClient client) {
    MqttPushClient.client = client;
  }

  /**
   * 客户端连接
   *
   * @param host   ip+端口
   * @param clientID 客户端Id
   * @param username 用户名
   * @param password 密码
   * @param timeout  超时时间
   * @param keepalive 保留数
   */
  public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
    MqttClient client;
    try {
      client = new MqttClient(host, clientID, new MemoryPersistence());
      MqttConnectOptions options = new MqttConnectOptions();
      options.setCleanSession(true);
      options.setUserName(username);
      options.setPassword(password.toCharArray());
      options.setConnectionTimeout(timeout);
      options.setKeepAliveInterval(keepalive);
      MqttPushClient.setClient(client);
      try {
        client.setCallback(pushCallback);
        client.connect(options);
      } catch (Exception e) {
        e.printStackTrace();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  /**
   * 发布
   *
   * @param qos     连接方式
   * @param retained  是否保留
   * @param topic    主题
   * @param pushMessage 消息体
   */
  public void publish(int qos, boolean retained, String topic, String pushMessage) {
    MqttMessage message = new MqttMessage();
    message.setQos(qos);
    message.setRetained(retained);
    message.setPayload(pushMessage.getBytes());
    MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
    if (null == mTopic) {
      logger.error("topic not exist");
    }
    MqttDeliveryToken token;
    try {
      token = mTopic.publish(message);
      token.waitForCompletion();
    } catch (MqttPersistenceException e) {
      e.printStackTrace();
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }

  /**
   * 订阅某个主题
   *
   * @param topic 主题
   * @param qos  连接方式
   */
  public void subscribe(String topic, int qos) {
    logger.info("开始订阅主题" + topic);
    try {
      MqttPushClient.getClient().subscribe(topic, qos);
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }
}

订阅主题以及发布的方式等内容更多编写
最后在搞个测试看看我们的结果是否正确:

TestController

@RestController
@RequestMapping("/")
public class TestController {

  @Autowired
  private MqttPushClient mqttPushClient;

  @GetMapping(value = "/publishTopic")
  public String publishTopic() {
    String topicString = "home/garden/fountain";
    mqttPushClient.publish(0, false, topicString, "测试一下发布消息");
    return "ok";
  }
  // 发送自定义消息内容(使用默认主题)
  @RequestMapping("/publishTopic/{data}")
  public String test1(@PathVariable("data") String data) {
    String topicString = "home/garden/fountain";
    mqttPushClient.publish(0,false,topicString, data);
    return "ok";
  }

  // 发送自定义消息内容,且指定主题
  @RequestMapping("/publishTopic/{topic}/{data}")
  public String test2(@PathVariable("topic") String topic, @PathVariable("data") String data) {
    mqttPushClient.publish(0,false,topic, data);
    return "ok";
  }
}

如此一来就OK!

你可以使用MQTT.fx进行测试。用Postman发出,就能够查看最终的结果。在这里,因为时间的原因就不多说,有啥有趣的问题,咱们可以一同探讨。 希望你希望,Damon将会不断的分享各种有趣的开发小故事给大家娱乐。下一期,或在POI实现导出导入或者是ActiveMQ进行选择。

到此这篇关于SpringBoot+MQTT+apollo实现订阅发布功能的示例的文章就介绍到这了,更多相关SpringBoot+MQTT+apollo订阅发布内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 深入JAVA对象深度克隆的详解

    深入JAVA对象深度克隆的详解

    本篇文章是对JAVA对象深度克隆进行了详细的分析介绍,需要的朋友参考下
    2013-05-05
  • Java异步编程Future应用方式

    Java异步编程Future应用方式

    Java中的Future接口用于构建复杂并行操作,它允许异步执行任务,并在需要时获取结果,通过Future接口,可以避免多线程编程中的一些常见问题,如线程执行顺序和结果获取的复杂性,然而,在使用Future时需要注意,并行执行可能会变为串行执行,特别是在使用get()方法时
    2025-02-02
  • Hibernate 与 Mybatis 的共存问题,打破你的认知!(两个ORM框架)

    Hibernate 与 Mybatis 的共存问题,打破你的认知!(两个ORM框架)

    这篇文章主要介绍了Hibernate 与 Mybatis 如何共存?本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • Java中Set与List的关系与区别介绍

    Java中Set与List的关系与区别介绍

    这篇文章主要介绍了Java中Set与List的关系与区别介绍,本文总结它们两个接口都是继承自Collection、它们之间的存储方式不一样,需要的朋友可以参考下
    2015-03-03
  • Java实现跳转到指定页面的方法小结

    Java实现跳转到指定页面的方法小结

    在Java中,实现页面跳转主要涉及到Web开发,而这通常通过使用Java的Web框架(如Servlet、Spring MVC)来完成,下面讲解一下如何在不同的Java Web框架中实现页面跳转,文中有详细的代码示例供大家参考,需要的朋友可以参考下
    2024-05-05
  • java线程的基础实例解析

    java线程的基础实例解析

    java中线程的基本方法的熟练使用是精通多线程编程的必经之路,线程相关的基本方法有wait,notify,notifyAll,sleep,join,yield等,本文浅要的介绍一下它们的使用方式
    2021-06-06
  • Java中的Vector详细解读

    Java中的Vector详细解读

    这篇文章主要介绍了Java中的Vector详细解读,Vector是实现了List接口的子类,其底层是一个对象数组,维护了一个elementData数组,是线程安全的,Vector类的方法带有synchronized关键字,在开发中考虑线程安全中使用Vector,需要的朋友可以参考下
    2023-09-09
  • 关于Java整合RocketMQ实现生产消费详解

    关于Java整合RocketMQ实现生产消费详解

    这篇文章主要介绍了关于Java整合RocketMQ实现生产消费详解,RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,需要的朋友可以参考下
    2023-05-05
  • Java实现按行读取大文件

    Java实现按行读取大文件

    这篇文章主要介绍了Java实现按行读取大文件的方法的小结,非常的简单实用,有需要的小伙伴尅参考下。
    2015-05-05
  • 浅谈springboot内置tomcat和外部独立部署tomcat的区别

    浅谈springboot内置tomcat和外部独立部署tomcat的区别

    这篇文章主要介绍了浅谈springboot内置tomcat和外部独立部署tomcat的区别,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-10-10

最新评论