深入多线程之:深入生产者、消费者队列分析

 更新时间:2013年05月14日 15:13:06   作者:  
本篇文章是对生产者与消费者队列进行了详细的分析介绍,需要的朋友参考下

上次我们使用AutoResetEvent实现了一个生产/消费者队列。这一次我们要使用Wait和Pulse方法来实现一个更强大的版本,它允许多个消费者,每一个消费者都在自己的线程中运行。

我们使用数组来跟踪线程。

Thread[] _workers;

通过跟踪线程可以让我们在所有的线程都结束后再结束我们的队列任务。

每一个消费者线程都执行一个叫做Consume的方法,在一个for循环中,我们可以创建和启动线程。例如:

复制代码 代码如下:

       public PCQueue(int workerCount)
        {
            _workers = new Thread[workerCount];
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Consume)).Start();
        }

上次我们使用的是一个字符串来代表任务,这次我们使用Action委托,它的定义如下:

Public delegate void Action();

为了表示一系列的任务,我们使用Queue<T> 集合,例如:

Queue<Action> _itemQ = new Queue<Action>();

在我们调用生产(EnqueueItem)和消费(Consume)方法前,还是完整的看一看代码吧:

复制代码 代码如下:

class PCQueue
    {
        readonly object _locker = new object();
        Thread[] _workers;
        Queue<Action> _itemQ = new Queue<Action>(); //保存任务的队列
        public PCQueue(int workerCount)
        {
            _workers = new Thread[workerCount];
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Consume)).Start();
        }

        public void Shutdown(bool waitForWorkers)
        {
           //为每一个线程插入一个null item,可以是每一个worker 退出
            foreach (Thread worker in _workers)
                EnqueueItem(null);

           //等待所有的线程退出。
            if (waitForWorkers)
                foreach (Thread worker in _workers)
                    worker.Join();
        }

        public void EnqueueItem(Action item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);
                Monitor.Pulse(_locker); //通知等待队列中的线程
            }
        }

        void Consume()
        {
            while (true)
            {
                Action item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //释放锁,并阻止当前线程,直到其他线程发送pulse信号。                    }
                    item = _itemQ.Dequeue();
                }

                if (item == null) return; //退出的信号
                item();
            }
        }
    }


我们可以有一个退出策略,插入一个null item作为consumer退出的信号。如果我们想要快速的退出,可以使用一个独立的”cancel” 标记,因为我们支持多个consumers,所以我们必须为每一个consumer插入一个null item。

下面是Main方法。使用两个consumer线程,然后让这两个consumers执行10个委托

复制代码 代码如下:

public static void Main()
        {
            PCQueue q = new PCQueue(2);
            Console.WriteLine("Enqueuing 10 items...");

            for (int i = 0; i < 10; i++)
            {
                int itemNumber = i;
                q.EnqueueItem(() =>
                    {
                        Thread.Sleep(1000); //模拟耗时的工作
                        Console.WriteLine(" Task " + itemNumber);
                    });
            }

            q.Shutdown(true); //等待关闭
            Console.WriteLine();
            Console.WriteLine("Workers complete!");
        }

下面让我们细致的看一看EnqueueItem方法:

复制代码 代码如下:

public void EnqueueItem(Action item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);
                Monitor.Pulse(_locker); //通知等待队列中的线程
            }
        }

因为我们的队列_itemQ被多线程环境使用,因此在对_itemQ进行读取的时候需要加锁lock.

因为我们插入了一个新的任务,我们必须修改阻塞条件,也就是调用pulse方法,来唤醒调用了wait方法的线程。


出于对效率的考虑,当插入一个Item的时候使用Pulse来代替PulseAll方法,因为大部分时候每一个Item只需要一个consumer来执行。如果你有一个冰淇淋,你不可能叫30个睡眠的孩子都起来吃它,同样,对于一个item,同时唤醒30个consumers一点好处都没有。


让我们再看看Consumer方法。

我们希望当没什么事情做的时候,线程阻塞就可以了,换句话说,队列中没有item的时候,线程就应该阻塞。因此我们的阻塞条件是_itemQ.Count ==0;

复制代码 代码如下:

Action item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //释放锁,并阻止当前线程,直到其他线程发送pulse信号。                    }
                    item = _itemQ.Dequeue();
                }

                if (item == null) return; //退出的信号
                item();


while循环退出的时候也意味着_itemQ 至少有一个item。我们必须在释放锁之前调用你哦个dequeue方法来获取item,考虑下下面的代码:
复制代码 代码如下:

lock (_locker)
                {
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //释放锁,并阻止当前线程,直到其他线程发送pulse信号。                    }
                }
                //现在在这里可能被抢占,_itemQ可能被修改
                lock (_locker)
                {
                    item = _itemQ.Dequeue();
                }

在item被Dequeued后,我们就应该立即释放锁了,如果我们在执行task的时候,一直持有锁,则会没有必要的阻塞其他线程来获取任务。


Wait Timeouts

在调用Wait方法的时候可以传递一个毫秒或Timespan的时间来设置超时。如果Wait超时了,那么Wait方法就会返回false。

带有超时功能的Wait方法的主要步骤:

    释放锁。
    阻塞 直到 pulsed 或者超时。
    重新获取锁。

超时就好像CLR 在超时到了的时候自动的调用了 pulse方法一样。

下面是使用超时的Wait的主要代码:

         lock(_locker)

         while(<阻塞条件>)

                   Monitor.Wait(_locker,<超时时间>);


Monitor.Wait 方法返回一个bool值来代表是调用了pulse还是已经超时了。

如果是true: 代表调用了pulse。

如果是false:代表超时了。

这对记录日志很有用。

相关文章

  • C#实现批量给图片添加水印的示例代码

    C#实现批量给图片添加水印的示例代码

    这篇文章主要为大家详细介绍了如何利用C#实现批量给图片添加水印的功能,文中的示例代码讲解详细,对我们学习C#有一定的帮助,感兴趣的小伙伴可以了解一下
    2022-12-12
  • C#判断单词个数方法总结

    C#判断单词个数方法总结

    我们给大家总计了C#中判断英文单词个数的方法以及排序的技巧,对此有需要的朋友可以测试下。
    2018-03-03
  • C# 使用 OleDbConnection 连接读取Excel的方法

    C# 使用 OleDbConnection 连接读取Excel的方法

    这篇文章主要介绍了C# 使用 OleDbConnection 连接读取Excel的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • C# 多态性的深入理解

    C# 多态性的深入理解

    本篇文章是对C#中的多态性进行了详细的分析介绍,需要的朋友参考下
    2013-06-06
  • 浅谈C# 字段和属性

    浅谈C# 字段和属性

    这篇文章主要介绍了C# 字段和属性的的相关资料,文中示例代码非常详细,供大家参考和学习,感兴趣的朋友可以了解下
    2020-06-06
  • C#数据类型转换(显式转型、隐式转型、强制转型)

    C#数据类型转换(显式转型、隐式转型、强制转型)

    本文详细讲解了C#数据类型转换(显式转型、隐式转型、强制转型),文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-01-01
  • C#根据年月日计算星期几的函数

    C#根据年月日计算星期几的函数

    这篇文章主要为大家详细介绍了C#实现根据年月日计算星期几的函数,感兴趣的小伙伴们可以参考一下
    2016-08-08
  • C#使用windows服务开启应用程序的方法

    C#使用windows服务开启应用程序的方法

    这篇文章主要介绍了C#使用windows服务开启应用程序的方法,实例分析了C#操作windows服务开启应用程序所遇到的问题及相关解决技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-09-09
  • C#快速实现IList非泛型类接口的自定义类作为数据源

    C#快速实现IList非泛型类接口的自定义类作为数据源

    本文主要介绍了C#快速实现IList非泛型类接口的自定义类作为数据源,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-02-02
  • C#验证给定字符串形式日期是否合法的方法

    C#验证给定字符串形式日期是否合法的方法

    这篇文章主要介绍了C#验证给定字符串形式日期是否合法的方法,实例分析了C#针对字符串及日期的操作技巧,非常具有实用价值,需要的朋友可以参考下
    2015-03-03

最新评论