C#基于共享内存实现跨进程队列

 更新时间:2024年07月16日 10:57:42   作者:CodeOfCC  
进程通信一般情况下比较少用,但是也有一些使用场景,有些做视频传输的似乎会用多进程来实现,还有在子进程中调用特定的库来避免内存泄漏,笔者最近也遇到了需要使用多进程的场景,本文介绍了C#基于共享内存实现跨进程队列,需要的朋友可以参考下

前言

进程通信一般情况下比较少用,但是也有一些使用场景,有些做视频传输的似乎会用多进程来实现,还有在子进程中调用特定的库来避免内存泄漏,笔者最近也遇到了需要使用多进程的场景。多进程的使用最主要的就是进程间的通信,本文参考了go语言的ipc库,实现了一个基于共享内存的跨进程队列。

一、实现原理

1、用到的主要对象

//共享内存管理对象
MemoryMappedFile _mmf;
//跨进程的互斥变量
Mutex _mtx;
//入队信号量
Semaphore _semaEq;
//出队信号量
Semaphore _semaDq;

2、创建共享内存

创建共享内存需要使用MemoryMappedFile.CreateFromFile实现跨平台。CreateNew只能创建无法打开第二个,OpenExisting只支持windows。

string name="共享内存标识名称";
_shmPath="共享内存文件路径"+name;
//通过文件路径创建共享内存
_mmf = MemoryMappedFile.CreateFromFile(File.Open(_shmPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite), null, (_QueuetHeaderSize + (elementBodyMaxSize + _ElementHeaderSize) * capacity), MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, false);
//创建互斥变量
_mtx = new Mutex(false, Name + ".mx");
//创建入队信号量,capacity为队列元素个数容量
_semaEq = new Semaphore(0, (int)capacity, name+ ".eq");
//创建出队信号量,capacity为队列元素个数容量
_semaDq = new Semaphore((int)capacity, (int)capacity, name + ".dq");

获取读写对象

_mmva = _mmf.CreateViewAccessor();

值类型数组方式写入

T[] obj;
_mmva.WriteArray<T>(position , obj, 0, obj.Length);

3、头部信息

采用循环队列方式实现,判断队空队满通过count、capacity的方式(参考了C#的Queue源码),避免占用多一个空间。

struct QueueHeader
{
    //元素大小
    public nint ElementSize;
    //队列容量
    public nint Capacity;
    //当前元素个数
    public nint Count;
    //队列头
    public nint Front;
    //队列尾
    public nint Rear;
}

队列头信息需要存储在共享内存中。

QueueHeader Header
{
    get
    {
        QueueHeader header;
        _mmva.Read(0, out header);
        return header;
    }
    set
    {
        _mmva.Write(0, ref value);
    }
}

4、入队

示例如下

bool Enqueuee<T>(T[] obj) where T : struct
{    
    //共享内存中读取header
    var header = Header;
    //队列满返回
    if (header.Count == header.Capacity) return false;
    //计算写入的位置,头部长度+队尾*元素大小
    nint position = _QueuetHeaderSize + header.Rear * header.ElementSize;
    //写入共享内存
    _mmva.WriteArray<T>(position, obj, 0, obj.Length);
    //更新队尾
    header.Rear = (header.Rear + 1) % header.Capacity;
    //更新长度
    header.Count++;
    //更新头部信息到共享内存
    Header=header;
    return true;
}

同步

 //等待出队信号量(如果队列满则会等待)
 if (!_semaDq.WaitOne(timeout)) return false;
 //进入互斥锁
 if (!_mtx.WaitOne(timeout)) return false;
 try
 {   
     //入队
     Enqueue(obj);
 }
 finally
 {
     //通知入队信号量
     _semaEq.Release();
     //释放互斥锁
     _mtx.ReleaseMutex();
 }
 return true;

5、出队

object Dequeue()
{   
    //共享内存中读取header
    var header = Header;
    //队列空则返回
    if (header.Count == 0) return null;
    //计算读取的位置,头部长度+队头*元素大小
    long position = _QueuetHeaderSize + header.Front * header.ElementSize;
    //创建数据用于装载数据
    Array arr = Array.CreateInstance(readType, msg.Header.ArrayLength);
    //将泛型转type调用。
    var readArray = _ReadArrayGeneric.MakeGenericMethod(readType);
    //读取共享内存的数据
    readArray.Invoke(_mmva, [position , arr, 0, arr.Length]);
    //更新队头
    header.Front = (header.Front + 1) % header.Capacity;
    //更新长度
    header.Count--;
    //更新头部信息到共享内存
    Header = header;
    return msg;
}

同步

 //等待入队信号量(如果队列空则会等待)
if (!_semaEq.WaitOne(timeout)) return null;
 //进入互斥锁
if (!_mtx.WaitOne(timeout!)) return null;
try
{ 
   //出队
    return  Dequeue();
}
finally
{
     //通知入队信号量
    _semaDq.Release();
     //释放互斥锁
    _mtx.ReleaseMutex();
}

6、释放资源

/// <summary>
/// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响
/// </summary>
public void Dispose()
{
    _mmf.Dispose();
    _mmva.Dispose();
    _mtx.Dispose();
    _semaEq.Dispose();
    _semaDq.Dispose();
}

二、完整代码

类的定义

/// <summary>
/// 共享队列
/// 基于共享内存实现
/// </summary>
class SharedQueue : IDisposable
{
    /// <summary>
    /// 名称
    /// </summary>
    public string Name { get; private set; }
    /// <summary>
    /// 元素最大大小
    /// </summary>
    public long ElementMaxSize { get; private set; }
    /// <summary>
    /// 队列容量
    /// </summary>
    public long Capacity { get; private set; }
    /// <summary>
    /// 表示是否新创建,是则是创建,否则是打开已存在的。
    /// </summary>
    public bool IsNewCreate { get; private set; }
    /// <summary>
    /// 构造方法
    /// </summary>
    /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个队列,可以进行数据传输。</param>
    /// <param name="capacity">队列容量,元素个数总量</param>
    /// <param name="elementBodyMaxSize">队列元素最大大小,此大小需要考虑传输数据Type.FullName长度</param>
    public SharedQueue(string name, nint capacity = 1, nint elementBodyMaxSize = 3145728);
    /// <summary>
    /// 发送数据
    /// </summary>
    /// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、List、字典等等),无法序列化会产生异常。
    /// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。
    /// 此方法队列满了会阻塞,直到发送成功才返回。
    /// </param>
    /// <param name="isForceSerialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param>
    public void Send(object obj, bool isForceSerialize = false);
    /// <summary>
    /// 接收数据
    /// 此方法队列空会阻塞,直到有数据才返回。
    /// </summary>
    /// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换</returns>
    public object Receive();
    /// <summary>
    /// 发送数据超时
    /// </summary>
    /// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、List、字典等等),无法序列化会产生异常。
    /// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。</param>
    /// <param name="timeout">超时时长</param>
    /// <param name="isForceSerialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param>
    /// <returns>true发送成功,false超时</returns>
    public bool SendTimeout(object obj, TimeSpan timeout, bool isForceSerialize = false);
    /// <summary>
    /// 接收超时
    /// </summary>
    /// <param name="timeout">超时时长</param>
    /// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换。
    /// 超时返回null。
    /// </returns>
    public object? ReceiveTimeout(TimeSpan timeout);
    /// <summary>
    /// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响
    /// </summary>
    public void Dispose();
}

三、使用示例

1、传输byte[]数据

进程a

SharedQueue shq= new SharedQueue("shq1", 10);
byte[] a = new byte[5] { 1, 2, 3, 4, 5 };
//发送数据
shq.send(a);

进程b

SharedQueue shq= new SharedQueue("shq1", 10);
//接收数据
var a=shq.Receive() as byte[];
Console.Write("receive: ");
foreach (var i in a)
{
    Console.Write(i);
}

2、传输字符串

进程a

SharedQueue shq= new SharedQueue("shq1", 10,64);
shq.send("12345");

进程b

SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as string;
Console.WriteLine("receive: " + a);

3、传输对象

class A
{
    public string Name;
    public int Number;
}

进程a

SharedQueue shq= new SharedQueue("shq1", 10,64);
sq.Send(new A() { Name = "Tommy", Number = 102185784 });

进程b

SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as A;
Console.WriteLine("receive: " + a.Name + " " + a.Number);

总结

以上就是今天要讲的内容,实现这样的一个对象,虽然代码量不多,但还是有一点难度的,很多细节需要处理,比如泛型转type以统一接口,信号量实现队列和条件变量是有差异的,用CreateFromFile才能实现跨平台。总的来说,有了这样的一个队列,跨线程通信就变的比较方便且高效了。

到此这篇关于C#基于共享内存实现跨进程队列的文章就介绍到这了,更多相关C#跨进程队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • c#实现适配器模式的项目实践

    c#实现适配器模式的项目实践

    适配器模式将一个类的接口转换成客户希望的另一个接口,使得原本由于接口不兼容而不能一起工作的那些类可以一起工作,本文主要介绍了c#实现适配器模式的项目实践,感兴趣的可以一起来了解一下
    2023-08-08
  • C#中实现多继承的方法

    C#中实现多继承的方法

    这篇文章主要介绍了C#中实现多继承的方法,本文通过给接口添加扩展的方法实现了C#的多继承,需要的朋友可以参考下
    2014-08-08
  • C# 6.0 新特性汇总

    C# 6.0 新特性汇总

    这篇文章主要介绍了C# 6.0 新特性汇总的相关资料,本文给大家带来了11种新特征,非常不错,感兴趣的朋友一起看看吧
    2016-09-09
  • Unity实现物体左右移动效果

    Unity实现物体左右移动效果

    这篇文章主要为大家详细介绍了Unity实现物体左右移动效果,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-08-08
  • Unity实现简单的虚拟摇杆

    Unity实现简单的虚拟摇杆

    这篇文章主要为大家详细介绍了Unity实现简单的虚拟摇杆,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-04-04
  • C# 委托与 Lambda 表达式转换机制及弱事件模式下的生命周期详解

    C# 委托与 Lambda 表达式转换机制及弱事件模式下的生命周期详解

    本文介绍了C#委托和Lambda表达式的工作原理,包括委托的内部结构、Lambda表达式的转换机制以及弱事件模式下的生命周期管理,感兴趣的朋友一起看看吧
    2025-02-02
  • C#圆形头像框制作并从数据库读取

    C#圆形头像框制作并从数据库读取

    本文主要介绍了C#圆形头像框制作并从数据库读取,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • c#如何使用 XML 文档功能

    c#如何使用 XML 文档功能

    这篇文章主要介绍了c#如何使用 XML 文档功能,帮助大家更好的理解和使用c#,感兴趣的朋友可以了解下
    2020-10-10
  • 使用C#实现在word中插入页眉页脚的方法

    使用C#实现在word中插入页眉页脚的方法

    这篇文章主要介绍了使用C#实现在word中插入页眉页脚的方法,是操作Word的常见方法,有一定的学习借鉴价值,需要的朋友可以参考下
    2014-08-08
  • c# 几种常见的加密方法的实现

    c# 几种常见的加密方法的实现

    这篇文章主要介绍了c# 几种常见的加密方法的实现,帮助大家更好的理解和使用c#,感兴趣的朋友可以了解下
    2020-12-12

最新评论