2011-04-29 116 views
5

我有一个处理大块信息的子例程。为了利用整个CPU,它将工作划分为单独的线程。所有线程完成后,结束。我读过创建和销毁线程使用大量开销,所以我尝试使用线程池,但实际上运行速度比创建自己的线程慢。如何在程序运行时创建自己的线程,然后继续重用它们?我见过一些人说它不能完成,但是线程池是这样做的,所以它一定是可能的,对吧?如何在.NET 3.5中重用线程

这里是启动新线程的部分代码/使用线程池:

//initialization for threads 
Thread[] AltThread = null; 
if (NumThreads > 1) 
    AltThread = new Thread[pub.NumThreads - 1]; 

do 
{ 
    if (NumThreads > 1) 
    { //split the matrix up into NumThreads number of even-sized blocks and execute on separate threads 
     int ThreadWidth = DataWidth/NumThreads; 
     if (UseThreadPool) //use threadpool threads 
     { 
      for (int i = 0; i < NumThreads - 1; i++) 
      { 
       ThreadPool.QueueUserWorkItem(ComputePartialDataOnThread, 
        new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) }); 
      } 
      //get number of threads available after queue 
      System.Threading.Thread.Sleep(0); 
      int StartThreads, empty, EndThreads; 
      ThreadPool.GetAvailableThreads(out StartThreads, out empty); 
      ComputePartialData(ThisEngine, 0, ThreadWidth); 

      //wait for all threads to finish 
      do 
      { 
       ThreadPool.GetAvailableThreads(out EndThreads, out empty); 
       System.Threading.Thread.Sleep(1); 
      } while (StartThreads - EndThreads > 0); 
     } 
     else //create new threads each time (can we reuse these?) 
     { 
      for (int i = 0; i < NumThreads - 1; i++) 
      { 
       AltThread[i] = new Thread(ComputePartialDataOnThread); 
       AltThread[i].Start(new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) }); 
      } 
      ComputePartialData(ThisEngine, 0, ThreadWidth); 

      //wait for all threads to finish 
      foreach (Thread t in AltThread) 
       t.Join(1000); 
      foreach (Thread t in AltThread) 
       if (t.IsAlive) t.Abort(); 
     } 
    } 
} 

ComputePartialDataOnThread只是解包的信息,并调用ComputePartialData。将被处理的数据在线程中共享(它们不会尝试读取/写入相同的位置)。 AltEngine []是每个线程的独立计算引擎。

该操作使用线程池运行约10-20%。

+3

你可以发布你的代码,以便我们可以看到你在做什么?有可能你在线程池中做了一些错误,导致它很慢。 – 2011-04-29 01:42:06

+0

也许它只在你的测试运行中很慢,也就是说你点击了最初的线程数,所以它必须创建更多的线程来满足你的需求。在运行任何测试之前,尝试手动设置池中的最小线程数。 – 2011-04-29 01:50:55

+0

线程的数量意味着匹配处理器内核的数量。在这种情况下,它只有2. – HypnoToad 2011-04-29 02:07:29

回答

11

这听起来像它可以通过一个多线程的生产者 - 消费者队列来解决一个相当普遍的要求。线程保持“活动”,并在新工作添加到队列时发出信号通知工作。工作由一个代表(在你的情况下ComputePartialDataOnThread)和传递给委托的数据是什么排队(在你的情况下,参数ComputePartialDataOnThread)。有用的特性是管理工作线程的实现和实际的算法是分开的。这里是P-c排队:

public class SuperQueue<T> : IDisposable where T : class 
{ 
    readonly object _locker = new object(); 
    readonly List<Thread> _workers; 
    readonly Queue<T> _taskQueue = new Queue<T>(); 
    readonly Action<T> _dequeueAction; 

    /// <summary> 
    /// Initializes a new instance of the <see cref="SuperQueue{T}"/> class. 
    /// </summary> 
    /// <param name="workerCount">The worker count.</param> 
    /// <param name="dequeueAction">The dequeue action.</param> 
    public SuperQueue(int workerCount, Action<T> dequeueAction) 
    { 
     _dequeueAction = dequeueAction; 
     _workers = new List<Thread>(workerCount); 

     // Create and start a separate thread for each worker 
     for (int i = 0; i < workerCount; i++) 
     { 
      Thread t = new Thread(Consume) { IsBackground = true, Name = string.Format("SuperQueue worker {0}",i)}; 
      _workers.Add(t); 
      t.Start(); 
     } 
    } 

    /// <summary> 
    /// Enqueues the task. 
    /// </summary> 
    /// <param name="task">The task.</param> 
    public void EnqueueTask(T task) 
    { 
     lock (_locker) 
     { 
      _taskQueue.Enqueue(task); 
      Monitor.PulseAll(_locker); 
     } 
    } 

    /// <summary> 
    /// Consumes this instance. 
    /// </summary> 
    void Consume() 
    { 
     while (true) 
     { 
      T item; 
      lock (_locker) 
      { 
       while (_taskQueue.Count == 0) Monitor.Wait(_locker); 
       item = _taskQueue.Dequeue(); 
      } 
      if (item == null) return; 

      // run actual method 
      _dequeueAction(item); 
     } 
    } 

    /// <summary> 
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. 
    /// </summary> 
    public void Dispose() 
    { 
     // Enqueue one null task per worker to make each exit. 
     _workers.ForEach(thread => EnqueueTask(null)); 

     _workers.ForEach(thread => thread.Join()); 

    } 
} 

正如前面的海报说,有很多内置的结构(看TPL),它使用线程池,你可能想实现自己的队列之前看。

+0

谢谢,这看起来是一个很好的做法。我会看看这个类是否比线程池快。 – HypnoToad 2011-04-29 21:18:24

+0

@DoctorZero:你成功使用它,速度更快吗? – hofnarwillie 2013-08-21 12:14:47

+0

是的,在它的最终化身中,它有点不同,也更复杂,但是也是相同的想法。几秒钟不活动后线程进入低优先级模式,然后在不活动30秒后退出。这符合他们期望的工作类型和频率。它的运行速度接近ThreadPool的速度,但不那么紧张。此外,我发现我可以通过将数据分割成相邻的行而不是大块来获得更好的CPU缓存,这也有所改变。 – HypnoToad 2013-11-16 13:39:49

2

所以通常的方式人会做到这一点是让每个线程的入口点主要做一些类似(这只是一个算法,不C#代码,抱歉)的东西:

  1. 检查,看看是否有工作做
  2. 做的工作,如果发现信号

  • 等待对方,只要你有你的线程更多的工作,将其添加到工作队列中埃森做,然后你的线程ce正在被重用。这与如何自行实现线程池非常相似(如果您在运行时可以做其他一些事情来帮助您,但这不是超级大事)。

  • +0

    如何在不占用CPU的情况下等待信号? – HypnoToad 2011-04-29 02:09:47

    +0

    当您等待某个信号时,您只需将其设置为阻止状态,并在有人发出信号指示调度程序使您可以运行时。虽然阻塞你没有时间在CPU上(但这不是最有效的东西,所以通常操作系统将只是忙着等一下)。被阻塞的线程不消耗处理器电源。 – 2011-04-29 02:35:37