C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

 更新时间:2020年10月17日 09:05:31   作者:做自己518  
这篇文章主要介绍了C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

1:RabbitMQ是个啥?(专业术语参考自网络)

 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

  RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库

2:使用RabbitMQ有啥好处?

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。

对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。

RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,

3:RabbitMq的安装以及环境搭建等:

网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!

3.1:运行容器的命令如下:

docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management

4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?

4.1什么时候使用MQ?

1)数据驱动的任务依赖

2)上游不关心多下游执行结果

3)异步返回执行时间长

4.2什么时候不使用MQ?

需要实时关注执行结果 (eg:同步调用)

5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)

6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:

Code:

//简单生产端 ui调用者

using System;
namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;

  class Program
  {
    static void Main(string[] args)
    {
        //就是简单的队列,生产者
        Console.WriteLine("====RabbitMqPublishDemo====");
        for (int i = 0; i < 500; i++)
        {
          ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
        }
        Console.WriteLine("生成完毕!");
        Console.ReadLine();
    }
  }
}

/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

  using (IConnection conn = connectionFactory.CreateConnection())
  {
    using (IModel channel = conn.CreateModel())
    {
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var msgBody = Encoding.UTF8.GetBytes(msg);
      channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
    }
  }
}


//简单消费端
using System;

namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;

  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("====RabbitMqConsumerDemo====");
      ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
      {
        Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
      });
      Console.ReadLine();
    }
  }
}

   #region 简单生产者后端逻辑
    /// <summary>
    /// 简单消费者
    /// </summary>
    /// <param name="queueName">队列名称</param>
    /// <param name="isBasicNack">失败后是否自动放到队列</param>
    /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
    public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
    {
      Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
      IConnection conn = connectionFactory.CreateConnection();
      IModel channel = conn.CreateModel();
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (sender, ea) =>
      {
        byte[] bymsg = ea.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bymsg);
        if (handleMsgStr != null)
        {
          handleMsgStr.Invoke(msg);
        }
        else
        {
          Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
        }
      };
      channel.BasicConsume(queueName, autoAck: true, consumer);
    }
    #endregion

7:Work模式

//简单生产端 ui调用者

using System;
namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;

  class Program
  {
    static void Main(string[] args)
    {
        //就是简单的队列,生产者
        Console.WriteLine("====RabbitMqPublishDemo====");
        for (int i = 0; i < 500; i++)
        {
          ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
        }
        Console.WriteLine("生成完毕!");
        Console.ReadLine();
    }
  }
}

/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

  using (IConnection conn = connectionFactory.CreateConnection())
  {
    using (IModel channel = conn.CreateModel())
    {
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var msgBody = Encoding.UTF8.GetBytes(msg);
      channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
    }
  }
}


//简单消费端
using System;

namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;

  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("====RabbitMqConsumerDemo====");
      ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
      {
        Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
      });
      Console.ReadLine();
    }
  }
}

   #region 简单生产者后端逻辑
    /// <summary>
    /// 简单消费者
    /// </summary>
    /// <param name="queueName">队列名称</param>
    /// <param name="isBasicNack">失败后是否自动放到队列</param>
    /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
    public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
    {
      Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
      IConnection conn = connectionFactory.CreateConnection();
      IModel channel = conn.CreateModel();
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (sender, ea) =>
      {
        byte[] bymsg = ea.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bymsg);
        if (handleMsgStr != null)
        {
          handleMsgStr.Invoke(msg);
        }
        else
        {
          Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
        }
      };
      channel.BasicConsume(queueName, autoAck: true, consumer);
    }
    #endregion

8:Fanout

Code:

//就如下的code, 多次生产,3个消费者都可以自动开始消费

//生产者
using System;
namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;
  class Program
  {
    static void Main(string[] args)
    {
      for (int i = 0; i < 500; i++)
      {
        ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");
      }
      Console.WriteLine("工作队列模式 生成完毕......!");
      Console.ReadLine();
    }
  }
}

//生产者后端逻辑
public static void PublishWorkQueueModel(string queueName, string msg)
    {
      using (var connection = connectionFactory.CreateConnection())
      using (var channel = connection.CreateModel())
      {
        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
        var body = Encoding.UTF8.GetBytes(msg);
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
        Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
      }
    }

//work消费端
using System;

namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;
  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("====Work模式开启了====");
      ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
      {
        Console.WriteLine($"work模式获取到消息{msg}");
      });
      Console.ReadLine();
    }
  }
}

//work后端逻辑
    public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
    {
      var connection = connectionFactory.CreateConnection();
      var channel = connection.CreateModel();

      channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
      channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

      var consumer = new EventingBasicConsumer(channel);
      Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");

      consumer.Received += (sender, ea) =>
      {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        if (handserMsg != null)
        {
          if (!string.IsNullOrEmpty(message))
          {
            handserMsg.Invoke(message);
          }
        }
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
      };
      channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

9:Direct

Code:

//同一个消息会被多个订阅者消费

//发布者
using System;

namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;

  class Program
  {
    static void Main(string[] args)
    {

      #region 发布订阅模式,带上了exchange
      for (int i = 0; i < 500; i++)
      {
        ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");
      }
      Console.WriteLine("发布ok!");
      #endregion
      Console.ReadLine();
    }
  }
}
//发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout
  public static void PublishExchangeModel(string exchangeName, string message)
    {
      using (var connection = connectionFactory.CreateConnection())
      using (var channel = connection.CreateModel())
      {
        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
        Console.WriteLine($" Sent {message}");
      }
    }


//订阅者
using System;
namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;
  class Program
  {
    static void Main(string[] args)
    {

      #region 发布订阅模式 Exchange
      ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
      {
        Console.WriteLine($"订阅到消息:{msg}");
      });
      #endregion
      Console.ReadLine();
    }
  }
}

//订阅者后端的逻辑
 public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
    {
      var connection = connectionFactory.CreateConnection();
      var channel = connection.CreateModel();

      channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉

      var queueName = channel.QueueDeclare().QueueName;
      channel.QueueBind(queue: queueName,
               exchange: exchangeName,
               routingKey: "");

      Console.WriteLine(" Waiting for msg....");

      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (model, ea) =>
      {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        if (handlerMsg != null)
        {
          if (!string.IsNullOrEmpty(message))
          {
            handlerMsg.Invoke(message);
          }
        }
        else
        {
          Console.WriteLine($"订阅到消息:{message}");
        }
      };
      channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    }

到此这篇关于C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)的文章就介绍到这了,更多相关C#使用RabbitMq队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • C#获取存储过程返回值和输出参数值的方法

    C#获取存储过程返回值和输出参数值的方法

    这篇文章主要介绍了C#获取存储过程返回值和输出参数值的方法,有需要的朋友可以参考一下
    2014-01-01
  • C# 窗口过程消息处理 WndProc的方法详解

    C# 窗口过程消息处理 WndProc的方法详解

    在WinForm中一般采用重写WndProc的方法对窗口或控件接受到的指定消息进行处理,本文给大家介绍C#窗口过程消息处理WndProc的方法详解,感兴趣的朋友一起看看吧
    2025-04-04
  • C#递归算法之快速排序

    C#递归算法之快速排序

    快速排序由C.A.R发明,它依据中心元素的值,利用一系列递归调用将数据表划分成越来越小的子表。在每一步调用中,经过多次的交换,最终为中心元素找到最终的位置。
    2016-06-06
  • C#实现单位换算器

    C#实现单位换算器

    这篇文章主要为大家详细介绍了C#实现单位换算器,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-01-01
  • 聚星C#数字信号处理工具包频谱分析的用法

    聚星C#数字信号处理工具包频谱分析的用法

    这篇文章主要介绍了聚星C#数字信号处理工具包频谱分析的用法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • C#中sealed关键字的具体使用

    C#中sealed关键字的具体使用

    在C#中sealed关键字用于阻止类被继承或成员被重写,它可以与class一起使用,本文主要介绍了C#中sealed关键字的具体使用,具有一定的参考价值,感兴趣的可以了解一下
    2025-01-01
  • C#使用CefSharp和网页进行自动化交互的示例代码

    C#使用CefSharp和网页进行自动化交互的示例代码

    CefSharp 是一个用 C# 编写的开源库,它封装了 Google Chrome 浏览器的 Chromium 内核,CefSharp 允许开发者在其应用程序中嵌入浏览器功能,从而能够展示网页内容、执行JavaScript代码,本文给大家介绍了C#使用CefSharp和网页进行自动化交互,需要的朋友可以参考下
    2024-07-07
  • WPF利用ValueConverter实现值转换器

    WPF利用ValueConverter实现值转换器

    值转换器在WPF开发中是非常常见的,值转换器可以帮助我们很轻松地实现,界面数据展示的问题。本文将通过WPF ValueConverter实现简单的值转换器,希望对大家有所帮助
    2023-03-03
  • c#使用Unity粒子实现炮塔发射系统

    c#使用Unity粒子实现炮塔发射系统

    Unity自带粒子发射器、动画器、渲染器各两种,利用Unity的粒子系统制作一个炮塔发射系统,了解粒子系统,必须先了解每一个属性都代表了什么,之后才能根据这些原理来调整出自己满意的效果
    2022-04-04
  • 【C#基础】Substring截取字符串的方法小结(推荐)

    【C#基础】Substring截取字符串的方法小结(推荐)

    这篇文章主要介绍了Substring截取字符串方法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-05-05

最新评论