C#MQTT协议服务器与客户端通讯实现(客户端包含断开重连模块)

 更新时间:2025年07月01日 11:10:04   作者:风,停下  
本文主要介绍了C#MQTT协议服务器与客户端通讯实现,包括客户端包含断开重连模块,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1 DLL版本

MQTTnet.DLL版本-2.7.5.0
基于比较老的项目中应用的DLL,其他更高版本变化可能较大,谨慎参考。

2 服务器

开启服务器
关闭服务器
绑定事件【客户端连接服务器事件】
绑定事件【客户端断开(服务器)连接事件】
绑定事件【客户端订阅主题事件】
绑定事件【客户端退订主题事件】
绑定事件【接收客户端(发送)消息事件】

using System;
using System.Net;
using MQTTnet;
using MQTTnet.Server;

namespace Demo_MQTT.Model
{
    public class ServerModel
    {
        private static MqttServer _mqttServer = null;


        private readonly Action<string> _callbackLog;

        public ServerModel(Action<string> callbackLog)
        {
            _callbackLog = callbackLog;
        }

        /// <summary>
        /// 绑定客户端连接服务器事件
        /// </summary>
        private void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)
        {
            WriteLog($"客户端[{e.Client.ClientId}]已连接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
        }

        /// <summary>
        /// 绑定客户端断开连接事件
        /// </summary>
        private void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)
        {
            WriteLog($"客户端[{e.Client.ClientId}]已断开连接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
        }

        /// <summary>
        /// 绑定客户端订阅主题事件
        /// </summary>
        private void Server_ClientSubscribedTopic(object sensor, MqttClientSubscribedTopicEventArgs e)
        {
            WriteLog($">>> 客户端{e.ClientId}订阅主题{e.TopicFilter.Topic}");
        }

        /// <summary>
        /// 绑定客户端退订主题事件
        /// </summary>
        /// <param name="e"></param>
        private void Server_ClientUnsubscribedTopic(object sensor, MqttClientUnsubscribedTopicEventArgs e)
        {
            WriteLog($">>> 客户端{e.ClientId}退订主题{e.TopicFilter}");

        }

        /// <summary>
        /// 绑定接收客户端消息事件
        /// </summary>
        private void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        {
            WriteLog($"接收到{e.ClientId}发送来的消息! {DateTime.Now:yyyy-MM-dd HH:mm:ss} {Environment.NewLine}");
        }

        private void WriteLog(string log)
        {
            _callbackLog?.Invoke(log);
        }

        /// <summary>
        /// 开启服务器
        /// </summary>
        /// <param name="ip">IP地址</param>
        /// <param name="port">端口号</param>
        public void StartServer(string ip, int port)
        {
            if (_mqttServer == null)
            {
                var optionsBuilder = new MqttServerOptionsBuilder()
                    .WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))
                    .WithConnectionBacklog(1000)
                    .WithDefaultEndpointPort(port);

                IMqttServerOptions options = optionsBuilder.Build();

                try
                {
                    _mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
                    _mqttServer.ClientConnected += MqttServer_ClientConnected;
                    _mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;
                    _mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;

                    _mqttServer.ClientSubscribedTopic += Server_ClientSubscribedTopic;
                    _mqttServer.ClientUnsubscribedTopic += Server_ClientUnsubscribedTopic;

                    _mqttServer.StartAsync(options);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    return;
                }

                WriteLog($"MQTT服务器启动成功 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");
            }
        }

        /// <summary>
        /// 关闭服务器
        /// </summary>
        public void CloseServer()
        {
            _mqttServer?.StopAsync();
        }
    }
}

3 客户端

连接服务器
属性:客户端连接状态
客户端断开重连线程
获取所有订阅主题
订阅主题
退订主题
发送消息
绑定事件【客户端连接服务器事件】
绑定事件【客户端断开(服务器)连接事件】
绑定事件【客户端接收消息事件】

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;

namespace Demo_MQTT.Model
{
    public class ClientModel
    {
        /// <summary>
        /// 记录所有订阅主题,用于断开重连时重新订阅主题
        /// </summary>
        private readonly List<string> _subscribeTopics = new List<string>();
        private MqttClient _mqttClient = null;
        private string _serverIp;
        private int _nServerPort;
        private bool _isRunningReConnectThreadStart = false;
        private string _clienID;

        /// <summary>
        /// 接受消息回调函数,参数:主题,消息内容
        /// </summary>
        private readonly Action<string, byte[]> _callbackReceived;
        private readonly Action<string> _callbackLog;

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="callbackReceived">接受消息回调函数,参数:主题,消息内容</param>
        /// <param name="callbackLog"></param>
        public ClientModel(Action<string, byte[]> callbackReceived, Action<string> callbackLog)
        {
            _callbackReceived = callbackReceived;
            _callbackLog = callbackLog;
        }

        /// <summary>
        /// 连接服务器
        /// </summary>
        private async void ConnectServer()
        {
            try
            {
                if (_mqttClient == null)
                {
                    _mqttClient = new MqttFactory().CreateMqttClient() as MqttClient;
                    _mqttClient.Connected += (s, a) => WriteLog($"【{_clienID}】已连接到MQTT服务器!");
                    _mqttClient.Disconnected += (s, a) => WriteLog($"【{_clienID}】已断开MQTT连接!");
                    _mqttClient.ApplicationMessageReceived += (sender, args) =>
                    {
                        _callbackReceived?.Invoke(args.ApplicationMessage.Topic, args.ApplicationMessage.Payload);
                    };
                }
                if (_mqttClient.IsConnected) return;

                IMqttClientOptions options = new MqttClientOptions
                {
                    ChannelOptions = new MqttClientTcpOptions()
                    {
                        Server = _serverIp,
                        Port = _nServerPort
                    },
                    CleanSession = true
                };

                _clienID = options.ClientId;
                await _mqttClient.ConnectAsync(options);
                if (_mqttClient.IsConnected)
                {
                    ReConnectThreadStart();
                    SubscribeAsync();
                }
            }
            catch (Exception ex)
            {
                WriteLog("连接到MQTT服务器失败!");
            }
        }

        /// <summary>
        /// 客户端重连服务器线程-启动
        /// </summary>
        /// <returns></returns>
        private void ReConnectThreadStart()
        {
            if (_isRunningReConnectThreadStart) return;

            if (_mqttClient != null)
            {
                new Task(() =>
                {
                    _isRunningReConnectThreadStart = true;
                    Thread.Sleep(5000);
                    while (true)
                    {
                        Thread.Sleep(1000);
                        if (!IsConnect)
                        {
                            WriteLog($"客户端[{_clienID}]断开连接,尝试重新连接服务器中...");
                            int i;
                            for (i = 0; i < 60; i++)
                            {
                                if (IsConnect) break;
                                WriteLog($"尝试第{i + 1}次连接服务器");
                                ConnectServer();
                                Thread.Sleep(1000);
                                if (IsConnect) break;
                            }
                            _isRunningReConnectThreadStart = i < 60;
                        }

                        if (!_isRunningReConnectThreadStart) break;
                    }

                }).Start();
            }
        }

        private void WriteLog(string log)
        {
            _callbackLog?.Invoke(log);
        }


        /// <summary>
        /// 客户端连接状态
        /// </summary>
        public bool IsConnect => _mqttClient?.IsConnected == true;

        /// <summary>
        /// 连接服务器
        /// </summary>
        /// <param name="serverIp">服务器IP</param>
        /// <param name="serverPort">服务器端口</param>
        /// <param name="topic"></param>
        public async void ConnectServer(string serverIp, int serverPort)
        {
            _serverIp = serverIp;
            _nServerPort = serverPort;

            await Task.Run(() => { ConnectServer(); });
        }

        /// <summary>
        /// 关闭客户端,断开客户端和服务器的连接
        /// </summary>
        public void CloseClient()
        {
            _mqttClient.DisconnectAsync();
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="topic">发送主题</param>
        /// <param name="cmd">发送内容</param>
        [Obsolete("Obsolete")]
        public void PublishAsync(string topic, string cmd)
        {
            var bytes = Encoding.UTF8.GetBytes(cmd);
            var mode = MqttQualityOfServiceLevel.AtMostOnce;
            var appMsg = new MqttApplicationMessage(topic, bytes, mode, false);
            _mqttClient.PublishAsync(appMsg);//发送消息
        }

        /// <summary>
        /// 订阅主题
        /// </summary>
        /// <param name="topics">主题标识</param>
        public void SubscribeAsync(params string[] topics)
        {
            foreach (var topic in topics)
            {
                if (!_subscribeTopics.Contains(topic))
                {
                    _subscribeTopics.Add(topic);
                }
            }
            var topicFilters = _subscribeTopics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();
            _mqttClient?.SubscribeAsync(topicFilters);
        }

        /// <summary>
        /// 退订已订阅主题
        /// </summary>
        /// <param name="topics">主题标识</param>
        public void UnSubscribeAsync(params string[] topics)
        {
            if (topics == null || topics.Length == 0) return;
            var topicFilters = topics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();
            _mqttClient.SubscribeAsync(topicFilters);
        }

        /// <summary>
        /// 获取所有订阅主题
        /// </summary>
        public string[] GetAllTopic => _subscribeTopics.ToArray();

    }
}

到此这篇关于C#MQTT协议服务器与客户端通讯实现(客户端包含断开重连模块)的文章就介绍到这了,更多相关C#中MQTT服务器与客户端通讯内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • C#如何判断.Net Framework版本是否满足软件运行需要的版本

    C#如何判断.Net Framework版本是否满足软件运行需要的版本

    这篇文章主要介绍了C#如何判断.Net Framework版本是否满足软件运行需要的版本问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-01-01
  • C#使用EasyModbus进行通讯的实现示例

    C#使用EasyModbus进行通讯的实现示例

    EasyModbus是一个流行的.NET库,用于实现Modbus TCP、RTU和UDP协议通信,本文就来介绍一下C#使用EasyModbus进行通讯的实现示例,感兴趣的可以了解一下
    2026-01-01
  • C#面向对象编程中接口隔离原则的示例详解

    C#面向对象编程中接口隔离原则的示例详解

    在面向对象编程中,SOLID 是五个设计原则的首字母缩写,旨在使软件设计更易于理解、灵活和可维护。本文将通过实例详细讲讲C#面向对象编程中接口隔离原则,需要的可以参考一下
    2022-07-07
  • 怎么利用c#修改services的Startup type

    怎么利用c#修改services的Startup type

    C#必须以管理员的权限运行才能达到效果的,不然service的startmode修改是没有效果的
    2013-08-08
  • C#实现简易画图板的示例代码

    C#实现简易画图板的示例代码

    这篇文章主要介绍了C#实现简易画图板的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-04-04
  • 详解C# 虚方法virtual

    详解C# 虚方法virtual

    这篇文章主要介绍了C# 虚方法virtual的相关资料,帮助大家更好的理解和学习使用c#,感兴趣的朋友可以了解下
    2021-04-04
  • C#计算器编写代码

    C#计算器编写代码

    这篇文章主要为大家分享了C#计算器编写代码,供大家参考,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-07-07
  • C#文件操作类分享

    C#文件操作类分享

    这篇文章主要为大家分享了C#文件操作类的相关代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-06-06
  • C# File类中的文件读写方法详解

    C# File类中的文件读写方法详解

    C#提供了多种操作文件的方案,尤其是File类中封装的静态方法,本文将通过一些简单的示例为大家讲讲C#读写文件的方法,需要的可以参考一下
    2023-05-05
  • Unity 如何获取鼠标停留位置下的物体

    Unity 如何获取鼠标停留位置下的物体

    这篇文章主要介绍了Unity 如何获取鼠标停留位置下的物体,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-04-04

最新评论