C#实现事件总线的方法示例

 更新时间:2024年02月19日 10:16:20   作者:Archy_Wang_1  
事件总线是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制,本文主要介绍了C#实现事件总线的方法示例,具有一定的参考价值,感兴趣的可以了解一下

EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。

它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口:IEventIAsyncEventHandler<TEvent>

IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。

其中,Publish<TEvent>PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法PublishPublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle

总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。

通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。

这种机制可以提高代码的可维护性和可扩展性。

Github仓库地址:https://github.com/DonPangPang/soda-event-bus

实现一些基本约束

先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。

public interface IEvent
{

}

public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{
    Task HandleAsync(IEvent @event);

    void HandleException(IEvent @event, Exception ex);
}

接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。

public interface IEventBus
{
    void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
    Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;

    void OnSubscribe<TEvent>() where TEvent : IEvent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。

LocalEvnetBusManager

LocalEventBusManager主要在单一管道内进行处理,集中进行消费。

public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{
    void Publish(TEvent @event);
    Task PublishAsync(TEvent @event) ;
    
    void AutoHandle();
}

public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>
    where TEvent: IEvent
{
    readonly IServiceProvider _servicesProvider = serviceProvider;

    private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();

    public void Publish(TEvent @event)
    {
        Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");
        _eventChannel.Writer.WriteAsync(@event);
    }

    private CancellationTokenSource Cts { get; } = new();

    public void Cancel()
    {
        Cts.Cancel();
    }
    
    public async Task PublishAsync(TEvent @event)
    {
        await _eventChannel.Writer.WriteAsync(@event);
    }

    public void AutoHandle()
    {
        // 确保只启动一次
        if (!Cts.IsCancellationRequested) return;

        Task.Run(async () =>
        {
            while (!Cts.IsCancellationRequested)
            {
                var reader = await _eventChannel.Reader.ReadAsync();
                await HandleAsync(reader);
            }
        }, Cts.Token);
    }

    async Task HandleAsync(TEvent @event)
    {
        var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();

        if (handler is null)
        {
            throw new NullReferenceException($"No handler for event {@event.GetType().Name}");
        }
        try
        {
            await handler.HandleAsync(@event);
        }
        catch (Exception ex)
        {
            handler.HandleException( @event, ex);
        }
    }
}

LocalEventBusPool

LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{
    private readonly IServiceProvider _serviceProvider = serviceProvider;

    private class ChannelKey
    {
        public required string Key { get; init; }
        public int Subscribers { get; set; }

        public override bool Equals(object? obj)
        {
            if (obj is ChannelKey key)
            {
                return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);
            }

            return false;
        }

        public override int GetHashCode()
        {
            return 0;
        }
    }

    private Channel<IEvent> Rent(string channel)
    {
        _channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);

        if (value != null) return value;
        value = Channel.CreateUnbounded<IEvent>();
        _channels.TryAdd(new ChannelKey() { Key = channel }, value);
        return value;
    }

    private Channel<IEvent> Rent(ChannelKey channelKey)
    {
        _channels.TryGetValue(channelKey, out var value);
        if (value != null) return value;
        value = Channel.CreateUnbounded<IEvent>();
        _channels.TryAdd(channelKey, value);
        return value;
    }

    private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();

    private CancellationTokenSource Cts { get; } = new();

    public void Cancel()
    {
        Cts.Cancel();
        _channels.Clear();
        Cts.TryReset();
    }

    public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);
    }

    public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
    {
        Rent(typeof(TEvent).Name).Writer.TryWrite(@event);
    }

    public void OnSubscribe<TEvent>() where TEvent : IEvent
    {
        var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??
                         new ChannelKey() { Key = typeof(TEvent).Name };
        channelKey.Subscribers++;

        Task.Run(async () =>
        {
            try
            {
                while (!Cts.IsCancellationRequested)
                {
                    var @event = await ReadAsync(channelKey);

                    var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
                    if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
                    try
                    {
                        await handler.HandleAsync((TEvent)@event);
                    }
                    catch (Exception ex)
                    {
                        handler.HandleException((TEvent)@event, ex);
                    }
                }
            }
            catch (Exception e)
            {
                throw new InvalidOperationException("Error on onSubscribe handler", e);
            }
        }, Cts.Token);
    }

    private async Task<IEvent> ReadAsync(string channel)
    {
        return await Rent(channel).Reader.ReadAsync(Cts.Token);
    }

    private async Task<IEvent> ReadAsync(ChannelKey channel)
    {
        return await Rent(channel).Reader.ReadAsync(Cts.Token);
    }
}

LocalEventBus

实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

public interface ILocalEventBus: IEventBus
{

}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{
    private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();
    
    
    public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
    {
        if (options.Pool)
        {
            Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
            EventBusPool.Publish(@event);
        }
        else
        {
            var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
            if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
            manager.Publish(@event);
        }
    }

    public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        if (options.Pool)
        {
            Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
            await EventBusPool.PublishAsync(@event);
        }
        else
        {
            var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
            if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
            await manager.PublishAsync(@event);
        }
    }

    public void OnSubscribe<TEvent>() where TEvent : IEvent
    {
        if (options.Pool)
        {
            Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
            EventBusPool.OnSubscribe<TEvent>();
        }
        else
        {
            var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
            if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
            manager.AutoHandle();
        }
    }
}

分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

到此这篇关于C#实现事件总线的方法示例的文章就介绍到这了,更多相关C# 事件总线内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:

相关文章

  • C# Winform多屏幕多显示器编程技巧实例

    C# Winform多屏幕多显示器编程技巧实例

    这篇文章主要介绍了C# Winform多屏幕多显示器编程技巧实例,本文直接给出代码实例,需要的朋友可以参考下
    2015-06-06
  • C#通过NPOI操作Excel的实例代码

    C#通过NPOI操作Excel的实例代码

    C#操作Excel的方法有很多种,本文介绍了C#通过NPOI操作Excel,具有一定的参考价值,有兴趣的可以了解一下。
    2017-01-01
  • .NET WinForm实现在listview中添加progressbar的方法

    .NET WinForm实现在listview中添加progressbar的方法

    这篇文章主要介绍了.NET WinForm实现在listview中添加progressbar的方法,结合实例形式简单分析了进度条控件的添加与使用方法,需要的朋友可以参考下
    2017-05-05
  • 利用C#快速查出哪些QQ好友空间屏蔽了自己

    利用C#快速查出哪些QQ好友空间屏蔽了自己

    我们经常会遇到以下情况吧:想点击好友空间看看他最近的动态,结果发现自己需要申请权限!别担心,本文将为大家介绍如何利用C#快速查出哪些QQ好友空间屏蔽了自己,需要的可以参考一下
    2022-02-02
  • c#添加Newtonsoft.Json包的操作

    c#添加Newtonsoft.Json包的操作

    这篇文章主要介绍了c#添加Newtonsoft.Json包的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01
  • C#使用AngleSharp库解析html文档

    C#使用AngleSharp库解析html文档

    这篇文章介绍了C#使用AngleSharp库解析html文档的方法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-06-06
  • C#窗体编程(windows forms)禁止窗口最大化的方法

    C#窗体编程(windows forms)禁止窗口最大化的方法

    这篇文章主要介绍了C#窗体编程(windows forms)禁止窗口最大化的方法,以及避免弹出系统菜单和禁止窗口拖拽的方法,需要的朋友可以参考下
    2014-08-08
  • C#中的ICustomFormatter及IFormatProvider接口用法揭秘

    C#中的ICustomFormatter及IFormatProvider接口用法揭秘

    这篇文章主要介绍了C#中的ICustomFormatter及IFormatProvider接口用法揭秘,本文能过分析一段代码得出一些研究结果,需要的朋友可以参考下
    2015-06-06
  • unity实现车方向盘转动效果

    unity实现车方向盘转动效果

    这篇文章主要为大家详细介绍了unity实现车方向盘转动效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-04-04
  • C#如何自定义线性节点链表集合

    C#如何自定义线性节点链表集合

    C#如何自定义线性节点链表集合,这篇文章主要为大家详细介绍了C#基于泛型的自定义线性节点链表集合示例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-07-07

最新评论