2013-04-25 67 views
1

我有一个进程,其中我的主线程正在读取文件并将其分解成多个部分。那些部分则需要进一步处理。我想利用任何可用的线程,以便下游处理尽可能多地利用CPU(或更多的核心)。我不想从主线程创建过多积压,所以我需要主线程等待添加到队列中,直到有另一个可用线程。.NET - 阻塞主线程,直到有任何可用线程

我看到像VB.NET 4.0: Looking to execute multiple threads, but wait until all threads are completed before resuming很多文章,但他们都在等待所有线程完成,而我只需要任何线程可用

这是不是我可以用Task Parallel Library解决,还是应该我手动创建线程和监视线程池?

Using Reader As New StreamReader(FileName) 
    Do 
     CurrentBlockSize = Reader.ReadBlock(CurrentBuffer, 0, BufferSize) 

     RunningBuffer &= New String(CurrentBuffer) 

     If RunningBuffer.Contains(RowDelimiter) Then 
      LineParts = RunningBuffer.Split(RowDelimiter) 

      For I As Integer = 0 To LineParts.Count - 1 
       If I < LineParts.Count - 1 Then 

        'Make synchronous call that blocks until' 
        'another thread is available to process the line' 
        AddLineToTheProcessingQueue(CurrentLine) 

       Else 
        RunningBuffer = LineParts(I) 
       End If 
      Next 
     End If 

    Loop While CurrentBlockSize = BufferSize 
End Using 
+0

这几乎肯定是浪费精力,这种代码几乎总是被磁盘绑定的。一个简单的检查:重新启动您的机器并运行单线程版本。如果一个内核的CPU负载不超过50%,那么添加更多线程无法使其更快。 – 2013-04-25 18:47:56

+0

@HansPassant,下游工作将涉及处理并通过TCP/IP向数据库发送大量数据,这可能会比磁盘读取速度慢。这就是我想要的多线程部分 – 2013-04-25 18:59:29

+0

然后拉,不要推送数据。就像大多数程序读取文件一样。 – 2013-04-25 19:48:42

回答

1

将此代码粘贴到新的控制台应用程序中。

Imports System.Threading 

Module Module1 

    ' I just picked 6 randomly, not sure what is a good strategy for picking this number 
    ' also, not sure what is the difference between a Worker Thread and a Completion thread 
    Const MaxWorkerThreads As Integer = 6 
    Const MaxCompletionPortThreads As Integer = 6 

    Sub Main() 

     ThreadPool.SetMaxThreads(MaxWorkerThreads, MaxCompletionPortThreads) 

     Dim availableWorkerThreads As Integer 
     Dim availableCompletionPortThreads As Integer 

     For i As Integer = 0 To 100 

      ' GetAvailableThreads returns results via output parameters 
      ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads) 

      Dim tries As Integer = 0 

      Do While (availableWorkerThreads = 0) 
       ' this loop does not execute if there are available threads 
       ' you may want to add a fail-safe to check "tries" in case the child threads get stuck 
       tries += 1 

       Console.WriteLine(String.Format("waiting to start item {0}, attempt {1}, available threads: {2}, {3}", i, tries, availableWorkerThreads, availableCompletionPortThreads)) 

       ' failure to call Sleep will make your program unresponsive 
       Thread.Sleep(1000) 

       ' call GetAvailableThreads again for the next test at the top of the loop 
       ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads) 
      Loop 

      ' this is how you pass parameters to a thread created through QueueUserWorkItem 
      Dim parameters As Object() = {i} 
      ThreadPool.QueueUserWorkItem(AddressOf DoWork, parameters) 
      ' According to MSDN, you must Sleep after calling QueueUserWorkItem, or else the current thread will just exit 
      Thread.Sleep(500) 

     Next 

    End Sub 

    Sub DoWork(parameters As Object()) 
     Dim itemNumber = parameters(0) 
     Dim sleepLength = itemNumber * 1000 
     Console.WriteLine(String.Format("Item: {0} - sleeping for {1} miliseconds.", itemNumber, sleepLength)) 
     Thread.Sleep(sleepLength) 
     Console.WriteLine(String.Format("Item: {0} - done sleeping.", itemNumber)) 
    End Sub 

End Module 
0

我不知道到底为什么做你想做的事,但你可以实现通过BlockingCollectionBoundedCapacity集的数据流块非常类似的东西。

例如,如果您将容量设置为1并且此时您的消费者正忙,您将无法向队列中添加第二个项目,直到其中一个消费者完成其当前工作并移除该项目从队列中。这两个版本都可以让您等待,直到您可以将其他项添加到队列中。

+0

是的,我昨天开始阅读TPL数据流和有限容量,非常有希望。经过一些测试后我会报告。您是否拥有带有限容量的数据流块的示例代码? – 2013-04-25 15:10:41

+0

@TomHalladay你可以看看[这组测试](https://github.com/mono/mono/blob/master/mcs/class/System.Threading.Tasks.DataStatus/Test/System.Threading.Tasks。 Dataflow/BoundedCapacityTest.cs)在TDF的单声道版本中。 – svick 2013-04-25 16:05:22