vue3中如何使用mqtt数据传输

 更新时间:2024年11月01日 16:15:55   作者:宝拉不想努力了  
这篇文章主要为大家详细介绍了vue3中如何使用mqtt数据传输,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

使用版本

"mqtt": "^5.8.0",

安装指令

npm install mqtt --save
------
yarn add mqtt

配置

connection: {
  protocol: "ws",
  host: "broker.emqx.io",
  port: 8083,
  endpoint: "/mqtt",
  clean: true,
  connectTimeout: 30 * 1000, // ms
  reconnectPeriod: 4000, // ms
  clientId: "emqx_vue_" + Math.random().toString(16).substring(2, 8),
  // 随机数 每次不能重复
  username: "emqx_test",
  password: "emqx_test",
},

连接

import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)

client.on('connect', (e) => {
  // 订阅主题
  
})

订阅主题

client.subscribe(topic, { qos: 1 }, (err) => {
  if (!err) {
    console.log('订阅成功')
  } else {
    console.log('消息订阅失败!')
  }
})

消息发布

给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。

client.publish(publishTopic, `{"messageType":1,"messageContent":""}`, { qos: 0 }, (err) => {
  if (!err) {
    console.log('发送成功')
    client.subscribe(topic, { qos: 1 }, (err) => {
      if (!err) {
        console.log('订阅成功')
      } else {
        console.log('消息订阅失败!')
      }
    })
  } else {
    console.log('消息发送失败!')
  }
})

取消订阅

client.unsubscribe(topicList, (error) => {
  console.log('主题为' + topicList + '取消订阅成功', error)
})

断开连接

export function unconnect() {
  client.end()
  client = null
  // Message.warning('服务器已断开连接!')
  console.log('服务器已断开连接!')
}

mqtt封装使用(ts版)

import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';

interface ClientOptions extends IClientOptions {
  clientId: string;
}

interface SubscribeOptions {
  topic: string;
  callback: (topic: string, message: string) => void;
  subscribeOption?: mqtt.IClientSubscribeOptions;
}

interface PublishOptions {
  topic: string;
  message: string;
}

class Mqtt {
  private static instance: Mqtt;
  private client: MqttClient | undefined;
  private subscribeMembers: Record<string, ((topic: string, message: string) => void) | undefined> = {};
  private pendingSubscriptions: SubscribeOptions[] = [];
  private pendingPublications: PublishOptions[] = [];
  private isConnected: boolean = false;

  private constructor(url?: string) {
    if (url) {
      this.connect(url);
    }
  }

  public static getInstance(url?: string): Mqtt {
    if (!Mqtt.instance) {
      Mqtt.instance = new Mqtt(url);
    } else if (url && !Mqtt.instance.client) {
      Mqtt.instance.connect(url);
    }
    return Mqtt.instance;
  }

  private connect(url: string): void {
    console.log(url, clientOptions);
    if (!this.client) {
      this.client = mqtt.connect(url, clientOptions);
      this.client.on('connect', this.onConnect);
      this.client.on('reconnect', this.onReconnect);
      this.client.on('error', this.onError);
      this.client.on('message', this.onMessage);
    }
  }

  public disconnect(): void {
    if (this.client) {
      this.client.end();
      this.client = undefined;
      this.subscribeMembers = {};
      this.isConnected = false;
      console.log(`服务器已断开连接!`);
    }
  }

  public subscribe({ topic, callback }: SubscribeOptions): void {
    if (this.isConnected) {
      this.client?.subscribe(topic, { qos: 1 }, error => {
        if (error) {
          console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败: `, error);
        } else {
          console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功`);
        }
      });
      this.subscribeMembers[topic] = callback;
    } else {
      this.pendingSubscriptions.push({ topic, callback });
    }
  }

  public unsubscribe(topic: string): void {
    if (!this.client) {
      return;
    }
    this.client.unsubscribe(topic, error => {
      if (error) {
        console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `, error);
      } else {
        console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`);
      }
    });
    this.subscribeMembers[topic] = undefined;
  }

  public publish({ topic, message }: PublishOptions): void {
    if (this.isConnected) {
      this.client?.publish(topic, message, { qos: 1 }, e => {
        if (e) {
          console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e);
        }
      });
    } else {
      this.pendingPublications.push({ topic, message });
    }
  }

  private onConnect = (e: any): void => {
    console.log(`客户端: ${clientOptions.clientId}, 连接服务器成功:`, e);
    this.isConnected = true;
    this.processPendingSubscriptions();
    this.processPendingPublications();
  };

  private onReconnect = (): void => {
    console.log(`客户端: ${clientOptions.clientId}, 正在重连:`);
    this.isConnected = false;
  };

  private onError = (error: Error): void => {
    console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error);
    this.isConnected = false;
  };

  private onMessage = (topic: string, message: Buffer): void => {
    // console.log(
    //   `客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `,
    //   message.toString(),
    // );
    const callback = this.subscribeMembers?.[topic];
    callback?.(topic, message.toString());
  };

  private processPendingSubscriptions(): void {
    while (this.pendingSubscriptions.length > 0) {
      const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;
      this.subscribe({ topic, callback, subscribeOption });
    }
  }

  private processPendingPublications(): void {
    while (this.pendingPublications.length > 0) {
      const { topic, message } = this.pendingPublications.shift()!;
      this.publish({ topic, message });
    }
  }
}

const clientOptions: ClientOptions = {
  clean: true,
  connectTimeout: 500,
  protocolVersion: 5,
  rejectUnauthorized: false,
  username: 'admin',
  password: 'Anjian-emqx',
  clientId: `client-${Date.now()}`
};

// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(`${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/`);

注意:

1.环境配置

.env.test
VITE_OTHER_SERVICE_BASE_URL= `{
  "mqtt": "ws://192.168.11.14:8083/mqtt"
}`

2.qos设置 前后端统一为1

以上就是vue3中如何使用mqtt数据传输的详细内容,更多关于vue3 mqtt数据传输的资料请关注脚本之家其它相关文章!

相关文章

  • vue2封装webSocket的实现(开箱即用)

    vue2封装webSocket的实现(开箱即用)

    在Vue2中,可以使用WebSocket实时通信,本文主要介绍了vue2封装webSocket的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-08-08
  • 关于Vue3路由push跳转问题(解决Vue2this.$router.push失效)

    关于Vue3路由push跳转问题(解决Vue2this.$router.push失效)

    这篇文章主要介绍了Vue3路由push跳转问题(解决Vue2this.$router.push失效),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-07-07
  • Vue3处理错误边界(error boundaries)的示例代码

    Vue3处理错误边界(error boundaries)的示例代码

    在开发 Vue 3 应用时,处理错误边界(Error Boundaries)是一个重要的考量,在 Vue 3 中实现错误边界的方式与 React 等其他框架有所不同,下面,我们将深入探讨 Vue 3 中如何实现错误边界,并提供一些示例代码帮助理解什么是错误边界,需要的朋友可以参考下
    2024-10-10
  • Vue Echarts渲染数据导致残留脏数据的问题处理

    Vue Echarts渲染数据导致残留脏数据的问题处理

    这篇文章主要介绍了Vue Echarts渲染数据导致残留脏数据的问题处理,文中通过代码示例给大家讲解的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-08-08
  • vue混入mixin使用特点

    vue混入mixin使用特点

    混入(mixin)提供了一种非常灵活的方式,来分发vue组件中的可复用功能。一个混入对象可以包含任意组件选项。当组件使用混入对象时,所有混入对象的选项将被“混合”进入该组件本身的选项
    2022-12-12
  • vue3如何监听页面的滚动

    vue3如何监听页面的滚动

    这篇文章主要给大家介绍了关于vue3如何监听页面的滚动的相关资料,在vue中实现滚动监听和原生js无太大差异,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2023-07-07
  • Vue使用Props实现组件数据交互的示例代码

    Vue使用Props实现组件数据交互的示例代码

    在Vue中,组件的props属性用于定义组件可以接收的外部数据,这些数据来自父组件并传递给子组件,本文给大家介绍了Vue使用Props实现组件数据交互,文中有详细的代码示例供大家参考,需要的朋友可以参考下
    2024-06-06
  • Vue项目如何安装引入使用Vant

    Vue项目如何安装引入使用Vant

    Vant是一个专为移动端设计的轻量级组件库,自2017年开源以来,提供了Vue2、Vue3及多平台版本支持,安装Vant时需要注意版本兼容问题,Vue3项目应安装最新版Vant3,而Vue2项目则需安装Vant2,安装错误时,需卸载后重新安装正确版本
    2024-10-10
  • Vue+Element一步步实现动态添加Input_输入框案例

    Vue+Element一步步实现动态添加Input_输入框案例

    这篇文章主要介绍了Vue+Element一步步实现动态添加Input_输入框案例,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-09-09
  • 一次在vue中使用post进行excel表下载的实战记录

    一次在vue中使用post进行excel表下载的实战记录

    最近遇到了需求,觉着有必要给大家总结下,这篇文章主要给大家介绍了关于一次在vue中使用post进行excel表下载的实战记录,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07

最新评论