2011-06-01 27 views
15

我有一个队列,可以在其上放置未决的傅立叶变换请求(相对耗时的操作) - 在某些情况下,我们可以每秒获得数千个转换请求,因此它的速度要快。使用线程处理队列的最有效方法

我正在升级旧代码以使用.net 4,以及移植到TPL。我想知道处理这个队列的最有效(最快的吞吐量)方式是什么样子。我想使用所有可用的内核。

目前我正在尝试使用BlockingCollection。我创建了一个队列处理器类,它产生了4个任务,这些任务阻塞了BlockingCollection并等待传入​​的工作。然后他们处理该待处理的转换。代码:

public class IncomingPacketQueue : IDisposable 
    { 
     BlockingCollection<IncomingPacket> _packetQ = new BlockingCollection<IncomingPacket>(); 

     public IncomingPacketQueue(int workerCount) 
     { 
      for (int i = 0; i < workerCount; i++) 
      { 
       Task.Factory.StartNew(Consume); 
      } 
     } 

     public void EnqueueSweep(IncomingPacket incoming) 
     { 
      _packetQ.Add(incoming); 
     } 

     private void Consume() 
     { 
      foreach (var sweep in _packetQ.GetConsumingEnumerable()) 
      { 
       //do stuff 
       var worker = new IfftWorker(); 
       Trace.WriteLine(" Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId)); 
       worker.DoIfft(sweep);     

      } 
     } 

     public int QueueCount 
     { 
      get 
      { 
       return _packetQ.Count; 
      } 
     } 

    #region IDisposable Members 

    public void Dispose() 
    { 
     _packetQ.CompleteAdding(); 
    } 

    #endregion 
    } 

这看起来像一个很好的解决方案吗?它似乎最大限度地利用了所有内核 - 尽管我目前不确定我应该在构造函数中产生多少工人。

+0

在一个侧面说明,你也可以使用英特尔IPP用来加快傅立叶变换工作(然后每个线程在一个内核上运行将使用SSE指令做这项工作),但是然后你有互操作的乐趣;-) – 2011-06-01 17:13:46

回答

7

这看起来很合理。我发现BlockingCollection相当快。我使用它来处理每秒数以万计的请求。

如果您的应用程序受处理器限制,那么您可能不希望创建比核心更多的工作人员。当然,你不想创造比核心更多的工人。在四核机器上,如果你期望大部分时间都花在做FFT上,那么四个工作人员会吃掉所有的CPU。更多的工作者意味着更多的是你有线程上下文切换来处理。第三方物流公司通常会为你平衡这一点,但当你无法处理的人数不多时,没有理由创造100名员工。

我建议你用3,4,5,6,7和8名工人进行测试。看看哪一个能给你最好的吞吐量。

+0

4给了我最好的 - 虽然它没有太多。我将坚持这一点,并使数字可配置,默认为Env.ProcessorCount(如果未设置)。 – 2011-06-01 20:36:42

0

使工作人员数量可配置。也有太多的工人,它会变慢(如另一张海报所示),所以你需要找到最佳点。可配置的值将允许测试运行找到最佳值,或者允许您的程序适用于不同类型的硬件。 YOu当然可以将这个值放在App.Config中并在启动时读取它。

2

我同意吉姆。你的方法看起来非常好。你不会做得更好。我不是FFT专家,但我假设这些操作几乎100%CPU受限。如果情况确实如此,那么对工人数量的首先猜测就是与机器内核的数量直接进行1对1关联。您可以使用Environment.ProcessorCount来获取此值。您可以尝试乘以2倍或4倍的乘数,但是如果这些操作受CPU限制,那么高于1倍的任何操作都可能导致更多开销。使用Environment.ProcessorCount将使您的代码更加便携。

另一个建议...让TPL知道这些是专用线程。您可以通过指定LongRunning选项来完成此操作。

public IncomingPacketQueue() 
{ 
    for (int i = 0; i < Environment.ProcessorCount; i++) 
    { 
     Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning); 
    } 
} 
+0

我同意,但您可能也想忽略来自HyperThreading的核心,只考虑真正的核心。 – 2011-06-01 17:16:39

+0

好的提示与Env.ProcessorCount ...应该适合我。 – 2011-06-01 20:26:36

0

您也可以尝试使用PLINQ并行处理来查看它与当前使用的方法的对比情况。它有一些窍门可以使它在某些情况下非常有效。

_packetQ.GetConsumingEnumerable().AsParallel().ForAll(
    sweep => new IfftWorker().DoIfft(sweep)); 
+1

您不能将PLINQ与BlockingCollection一起使用。默认分区程序可能会错过项目或死锁。始终使用ParallelExtensionsExtras中的BlockingCollectionPartitioner – adrianm 2011-06-01 17:56:01

2

为什么不使用Parallel.ForEach并让TPL处理创建的线程数。

 Parallel.ForEach(BlockingCollectionExtensions.GetConsumingPartitioneenter(_packetQ), 
         sweep => { 
          //do stuff 
          var worker = new IfftWorker(); 
          Trace.WriteLine(" Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId)); 
          worker.DoIfft(sweep);     

         }); 

(该GetConsumingPartitioner是ParallelExtensionsExtras的一部分)

+0

这看起来也是一个不错的解决方案 - 我会玩这个,看看我得到了什么:) – 2011-06-01 20:22:06

相关问题