C#使用RabbitMQ发送和接收消息工具类的实现

 更新时间:2023年12月19日 11:36:48   作者:让梦想疯狂  
RabbitMQ是一个消息的代理器,用于接收和发送消息,本文主要介绍了C#使用RabbitMQ发送和接收消息工具类的实现,具有一定的参考价值,感兴趣的可以了解一下

下面是一个简单的 C# RabbitMQ 发送和接收消息的封装工具类的示例代码:

工具类

通过NuGet安装RabbitMQ.Client

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class RabbitMQHelper : IDisposable
    {
        private readonly ConnectionFactory _factory;
        private IConnection _connection;
        private IModel _channel;
        public RabbitMQHelper()
        {
            // 设置连接参数
            _factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="message"></param>
        public void SendMessage<T>(string queueName, T message)
        {
            try
            {
                InitConnection();

                // 声明队列
                _channel.QueueDeclare(queue: queueName,
                    durable: true,// 设置为true表示队列是持久化的
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                _channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);

            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed to send message: {0}", ex.Message);
            }
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="messageHandler"></param>
        public void ReceiveMessage<T>(string queueName, Action<T> messageHandler)
        {
            try
            {
                InitConnection();

                // 声明队列(接收需声明队列,否则队列不存在时,无法接收消息)
                _channel.QueueDeclare(queue: queueName,
                    durable: true, // 设置为true表示队列是持久化的
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                //设置消费者数量(并发度),每个消费者每次只能处理一条消息
                _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                // 创建消费者
                var consumer = new EventingBasicConsumer(_channel);
                consumer.Received += (model, ea) =>
                {
                    try
                    {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());

                        var convertedMessage = JsonConvert.DeserializeObject<T>(message);

                        //委托方法
                        messageHandler.Invoke(convertedMessage);

                        // 消息处理成功,确认消息
                        _channel.BasicAck(ea.DeliveryTag, false);
                    }
                    catch (Exception ex)
                    {
                        // 消息处理异常,确认消息
                        _channel.BasicAck(ea.DeliveryTag, false);
                    }
                };

                _channel.BasicConsume(queue: queueName,
                    autoAck: false,// 设置为true表示自动确认消息
                    consumer: consumer);

            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed to receive message: {0}", ex.Message);
            }
        }

        /// <summary>
        /// 初始化链接
        /// </summary>
        private void InitConnection()
        {
            if (_connection == null || !_connection.IsOpen)
            {
                _connection = _factory.CreateConnection();
                _channel = _connection.CreateModel();
            }
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            _channel?.Close();
            _channel?.Dispose();
            _connection?.Close();
            _connection?.Dispose();
        }
    }
}

使用示例

using System;
using System.Text;
using System.Threading.Tasks;
using WorkerService1;

public class Program
{
    private static string QueueName = "myqueue_key";
    public static void Main()
    {

        var rabbitMQHelper = new RabbitMQHelper();
        for (long i = 0; i < 30; i++)
        {
            rabbitMQHelper.SendMessage(QueueName, i);
        }

        rabbitMQHelper.ReceiveMessage<long>(QueueName, ReceivedHandle);

        Console.ReadLine();
    }

    /// <summary>
    /// 接收处理
    /// </summary>
    /// <param name="index"></param>
    private static void ReceivedHandle(long index)
    {
        try
        {
            Console.WriteLine($"第{index}次开始{DateTime.Now}");
            Thread.Sleep(2000);
            Console.WriteLine($"第{index}次结束{DateTime.Now}");
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

到此这篇关于C#使用RabbitMQ发送和接收消息工具类的实现的文章就介绍到这了,更多相关C# RabbitMQ发送和接收内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

最新评论