C#  Task  TaskFactory  设置最大并行线程数的方法

 更新时间:2026年04月29日 08:21:39   作者:yuanpan  
本文深入探讨了LimitedConcurrencyLevelTaskScheduler的工作原理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1. LimitedConcurrencyLevelTaskScheduler 介绍

这个TaskScheduler用过的应该都知道,微软开源的一个任务调度器,它的代码很简单,
也很好懂,但是我没有明白的是他是如何实现限制并发数的
首先贴下它的代码,大家先熟悉一下。

public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
    /// <summary>Whether the current thread is processing work items.</summary> 
    [ThreadStatic]
    private static bool _currentThreadIsProcessingItems;
    /// <summary>The list of tasks to be executed.</summary> 
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) 
                                                                       /// <summary>The maximum concurrency level allowed by this scheduler.</summary> 
    private readonly int _maxDegreeOfParallelism;
    /// <summary>Whether the scheduler is currently processing work items.</summary> 
    private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) 
    /// <summary> 
    /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the 
    /// specified degree of parallelism. 
    /// </summary> 
    /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param> 
    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
    }
    /// <summary>
    /// current executing number;
    /// </summary>
    public int CurrentCount { get; set; }
    /// <summary>Queues a task to the scheduler.</summary> 
    /// <param name="task">The task to be queued.</param> 
    protected sealed override void QueueTask(Task task)
    {
        // Add the task to the list of tasks to be processed. If there aren't enough 
        // delegates currently queued or running to process tasks, schedule another. 
        lock (_tasks)
        {
            Console.WriteLine("Task Count : {0} ", _tasks.Count);
            _tasks.AddLast(task);
            if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
            {
                ++_delegatesQueuedOrRunning;
                NotifyThreadPoolOfPendingWork();
            }
        }
    }
    int executingCount = 0;
    private static object executeLock = new object();
    /// <summary> 
    /// Informs the ThreadPool that there's work to be executed for this scheduler. 
    /// </summary> 
    private void NotifyThreadPoolOfPendingWork()
    {
        ThreadPool.UnsafeQueueUserWorkItem(_ =>
        {
            // Note that the current thread is now processing work items. 
            // This is necessary to enable inlining of tasks into this thread. 
            _currentThreadIsProcessingItems = true;
            try
            {
                // Process all available items in the queue. 
                while (true)
                {
                    Task item;
                    lock (_tasks)
                    {
                        // When there are no more items to be processed, 
                        // note that we're done processing, and get out. 
                        if (_tasks.Count == 0)
                        {
                            --_delegatesQueuedOrRunning;
                            break;
                        }
                        // Get the next item from the queue 
                        item = _tasks.First.Value;
                        _tasks.RemoveFirst();
                    }
                    // Execute the task we pulled out of the queue 
                    base.TryExecuteTask(item);
                }
            }
            // We're done processing items on the current thread 
            finally { _currentThreadIsProcessingItems = false; }
        }, null);
    }
    /// <summary>Attempts to execute the specified task on the current thread.</summary> 
    /// <param name="task">The task to be executed.</param> 
    /// <param name="taskWasPreviouslyQueued"></param> 
    /// <returns>Whether the task could be executed on the current thread.</returns> 
    protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        // If this thread isn't already processing a task, we don't support inlining 
        if (!_currentThreadIsProcessingItems) return false;
        // If the task was previously queued, remove it from the queue 
        if (taskWasPreviouslyQueued) TryDequeue(task);
        // Try to run the task. 
        return base.TryExecuteTask(task);
    }
    /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> 
    /// <param name="task">The task to be removed.</param> 
    /// <returns>Whether the task could be found and removed.</returns> 
    protected sealed override bool TryDequeue(Task task)
    {
        lock (_tasks) return _tasks.Remove(task);
    }
    /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> 
    public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
    /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> 
    /// <returns>An enumerable of the tasks currently scheduled.</returns> 
    protected sealed override IEnumerable<Task> GetScheduledTasks()
    {
        bool lockTaken = false;
        try
        {
            Monitor.TryEnter(_tasks, ref lockTaken);
            if (lockTaken) return _tasks.ToArray();
            else throw new NotSupportedException();
        }
        finally
        {
            if (lockTaken) Monitor.Exit(_tasks);
        }
    }
}

简单使用

下面是调用代码。

static void Main(string[] args)
{
        TaskFactory fac = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(5));
        //TaskFactory fac = new TaskFactory();
        for (int i = 0; i < 1000; i++)
        {
            fac.StartNew(s => {
                Thread.Sleep(1000);
                Console.WriteLine("Current Index {0}, ThreadId {1}",s,Thread.CurrentThread.ManagedThreadId);
            }, i);
        }
        Console.ReadKey();
}

调用很简单
根据调试调用顺序可以知道。
使用 LimitedConcurrencyLevelTaskScheduler 创建好TaskFactory 后,
调用该TaskFacotry.StartNew 方法后。会进入 LimitedConcurrencyLevelTaskScheduler 的
QueueTask 方法。

/// <summary>Queues a task to the scheduler.</summary> 
    /// <param name="task">The task to be queued.</param> 
    protected sealed override void QueueTask(Task task)
    {
        // Add the task to the list of tasks to be processed. If there aren't enough 
        // delegates currently queued or running to process tasks, schedule another. 
        lock (_tasks)
        {
            Console.WriteLine("Task Count : {0} ", _tasks.Count);
            _tasks.AddLast(task);
            if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
            {
                ++_delegatesQueuedOrRunning;
                NotifyThreadPoolOfPendingWork();
            }
        }
    }
    

代码很简单,把刚创建的Task 添加到任务队列中去,然后判断当前正在执行的任务数量与设置的允许最大并发数进行比较, 如果小于该值,则开始通知正在挂起的任务开始执行。
我的疑问主要在 NotifyThreadPoolOfPendingWork 这个方法上。

private void NotifyThreadPoolOfPendingWork()
    {
        ThreadPool.UnsafeQueueUserWorkItem(_ =>
        {
            // Note that the current thread is now processing work items. 
            // This is necessary to enable inlining of tasks into this thread. 
            _currentThreadIsProcessingItems = true;
            try
            {
                // Process all available items in the queue. 
                while (true)
                {
                    Task item;
                    lock (_tasks)
                    {
                        // When there are no more items to be processed, 
                        // note that we're done processing, and get out. 
                        if (_tasks.Count == 0)
                        {
                            --_delegatesQueuedOrRunning;
                            break;
                        }
                        // Get the next item from the queue 
                        item = _tasks.First.Value;
                        _tasks.RemoveFirst();
                    }
                    // Execute the task we pulled out of the queue 
                    base.TryExecuteTask(item);
                }
            }
            // We're done processing items on the current thread 
            finally { _currentThreadIsProcessingItems = false; }
        }, null);
    }

从代码中看到的意思是一直跑一个死循环, 不断从_tasks 中取出Task执行,
直到_task为空为止,然后退出循环。从这里并没有看到限制并发数的限制,只有在QueueTask中调用的时候有个简单的限制,然而好像并没有什么卵用,
因为只要 NotifyThreadPoolOfPendingWork 方法启动了, 就会一直跑,直到所有的Task执行完成。那他的并发数是如何限制的呢?

一直很迷惑,是不是我哪里理解错了, 还请知道的大神解惑一下。

到此这篇关于C#  Task  TaskFactory  设置最大并行线程数的方法的文章就介绍到这了,更多相关C#  Task  TaskFactory并行线程数内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • c#中DateTime.Now函数的使用详解

    c#中DateTime.Now函数的使用详解

    本篇文章对c#中DateTime.Now函数的使用进行了介绍。需要的朋友参考下
    2013-05-05
  • C#如何遍历Dictionary

    C#如何遍历Dictionary

    这篇文章主要为大家详细介绍了C#遍历Dictionary的方法,.NET中的Dictionary是键/值对的集合,使用起来比较方便,Dictionary也可以用KeyValuePair来迭代遍历,感兴趣的小伙伴们可以参考一下
    2016-04-04
  • C#中的多态深入理解

    C#中的多态深入理解

    如果面试时主考官要求你用一句话来描述多态,尽可能的精炼,你会怎么回答?当然答案有很多,每个人的理解和表达不尽相同,但我比较趋向这样描述:通过继承实现的不同对象调用相同的方法,表现出不同的行为,称之为多态
    2014-01-01
  • C#实现冻结Excel窗口以锁定行列或解除冻结

    C#实现冻结Excel窗口以锁定行列或解除冻结

    在处理大型Excel工作簿时,有时候我们需要在工作表中冻结窗格,这样可以在滚动查看数据的同时保持某些行或列固定不动,下面我们就来看看如何使用C#实现冻结Excel窗口吧
    2024-04-04
  • DevExpress之ChartControl实现时间轴实例

    DevExpress之ChartControl实现时间轴实例

    这篇文章主要介绍了DevExpress中ChartControl实现时间轴的方法,涉及相关C#绘图程序用法,具有一定的实用价值,需要的朋友可以参考下
    2014-10-10
  • C#实现xml文件反序列化读入数据到object的方法

    C#实现xml文件反序列化读入数据到object的方法

    这篇文章主要介绍了C#实现xml文件反序列化读入数据到object的方法,涉及C#操作XML文件类型转换的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-07-07
  • C# 串口扫描枪读取数据的实现

    C# 串口扫描枪读取数据的实现

    本文主要介绍了C# 串口扫描枪读取数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-04-04
  • C#实现读取DataSet数据并显示在ListView控件中的方法

    C#实现读取DataSet数据并显示在ListView控件中的方法

    这篇文章主要介绍了C#实现读取DataSet数据并显示在ListView控件中的方法,涉及C#操作DataSet及ListView控件的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-10-10
  • C#中自定义高精度Timer定时器的实例教程

    C#中自定义高精度Timer定时器的实例教程

    这篇文章主要介绍了C#中自定义高精度Timer定时器的实例教程,多线程的Timer编写需要注意线程安全的问题,需要的朋友可以参考下
    2016-04-04
  • C#实现自定义圆角按钮的方法

    C#实现自定义圆角按钮的方法

    Winform中自带的button没有圆角属性,所以我们继承Button类,重写OnPaint事件来绘制圆角按钮。下面通过实例代码给大家介绍下C#实现自定义圆角按钮的方法,需要的朋友参考下吧
    2021-11-11

最新评论