2014-03-26 171 views
0

专家,添加和使用BlockingCollection删除多线程生产者和消费者在VB.NET

这是当你无法找到在任何地方Interwebs的解决方案会发生什么,你不得不在它砍,直到你看起来不错[足够]。

我有一种情况,我需要解析高速进入的对象,但解析需要相对较长的时间 - 远比他们进来的速度慢......我可以很容易地知道盒子上有多少个核心,然后我可以分成一组工作线程(任务),全部除&征服!但是,任何多线程应用程序的问题是“多少线程?”

有没有硬的&快速的回答,所以不要打扰看。我的方法是制定一种灵活的方法,让我的主线程可以监控它,看看它正在运行的机器的吞吐量(在X时间内完成的总计工作量)是否最大。此外,相同的机器可能会随着时间的推移在负载,内存等方面有所不同,因此您不能只将其设置为&忘记它...

我只是要求回答我自己的问题,这是鼓励。

+0

问这个问题是否已经有解决方案的要点是什么? –

+2

@JimMischel [与他人分享](http://blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) – svick

+0

@svick:谢谢。我会记住这一点。 –

回答

0

[注意:行号是指在原博客文章here上插入蜡笔的Wordpress。在该链接,你也可以下载整个VS 2012的解决方案的7Z]

因此,这是这个想法:

enter image description here

让我们看看我是如何解决阻塞收集问题然后把我们的工作放到解析器任务中......我使用了一种非常通用且易于重用的生产者和消费者方法。事实上,在我的例子中,你甚至可以调整生产者的数量。您可以在下面调整我的代码以更接近您的工作,然后调整线程/任务/消费者的数量,以查看并行性对您的工作有多好。

首先,在进口通常的嫌疑人..

Imports System.Threading 
Imports System.Threading.Tasks 
Imports System.Collections.Concurrent 

Module modStartHere 
    ' 
    ' Producer 
    ' 
    Dim itemsToProduce = 10 
    Dim sleepProducer = 10 ' in milliseconds 
    Dim producerStartID = 1 
    Dim producersNumToStart = 1 
    Dim ProducerCTSs As New ConcurrentBag(Of CancellationTokenSource) 
    Dim moreItemsToAdd As Boolean = True 
    ' 
    ' Consumer 
    ' 
    Dim sleepConsumer = 1000 ' in milliseconds 
    Dim consumerStartID = 100 
    Dim consumersNumToStart = 3 
    Dim ConsumerCTSs As New ConcurrentBag(Of CancellationTokenSource) 

生产者(一个或多个)的初始设置与上述。尽管itemsToProduce在程序期间没有改变,但生产者的数量将会减少。作为一个草案,这是非常粗糙的,在某些时候它无疑会在你自己的代码中精简,但是这表明了如何很好地解决这个问题。

我用“IDs”在输出中显示哪个线程正在做什么。他们只生产所需的就是CTS的实例列表:

' 
' the multi-thread-safe queue that is produced to and consumed from 
' 
Dim bc As New BlockingCollection(Of Integer) 
' 
' this # will be what is actually produced & consumed (1, 2, 3, ...) 
' 
Dim itemId As Integer = 0 
' 

这里的主要机器是一个小行:

昏暗BC作为新BlockingCollection(整数)

微软说:

BlockingCollection概述.NET Framework 4。5

BlockingCollection(OF T)是一个线程安全集合类 提供以下功能:

An implementation of the Producer-Consumer pattern. 
Concurrent adding and taking of items from multiple threads. 
Optional maximum capacity. 
Insertion and removal operations that block when collection is empty or full. 
Insertion and removal "try" operations that do not block or that block up to a specified period of time. 
Encapsulates any collection type that implements IProducerConsumerCollection(Of T) 
Cancellation with cancellation tokens. 
Two kinds of enumeration with foreach (For Each in Visual Basic): 

-Read-only enumeration. 
-Enumeration that removes items as they are enumerated. 

的itemId只是保持所述假有效载荷的变量。生产者会将其增加1以模拟不同的对象实例或工作单元。您只需更改BlockingCollection保留的类型...

现在我不是以FIFO方式(我将在制作中)执行此操作,但您可以按照Microsoft的方式执行此操作,或者甚至可以执行FILO操作:

当你创建一个BlockingCollection(中T)的对象,您可以指定不仅 的有限容量,而且收集的使用类型。例如,对于 示例,您可以为先进先出(FIFO)行为的第一个 指定一个ConcurrentQueue(Of T)对象,或者为先进先出(LIFO)行为指定一个ConcurrentStack(Of T)对象。

现在这很有用!在这个演示中,我做了所有的事情......但就像我说的,对于我的特定需求,我需要FIFO,如顶部的图中所示...

稍后,您将看到函数和子程序,但这里的真正神奇的是在2集 - 一个生产者和一个消费者:

昏暗ProducerCTSs作为新ConcurrentBag(中CancellationTokenSource) 昏暗ConsumerCTSs作为新ConcurrentBag(中CancellationTokenSource)

The Magic: As each Task(thread) is either created or closed, the corresponding CancellationTokenSource is either added or removed from the appropriate collection above. 

说真的,就是这样! :)

下一个代码,初始生产者和消费者创建:

'=============================== 
' 
' start demo 
' 
Sub Main() 
    ' 
    '=============================== 
    ' 
    ' initial state: 
    ' 
    ' start our producer(s) 
    ' 
    For ps As Integer = producerStartID To producerStartID + producersNumToStart - 1 
     CreateTask(ps, "Producer") 
    Next 
    ' 
    ' start our consumer(s) 
    ' 
    For cs As Integer = consumerStartID To consumerStartID + consumersNumToStart - 1 
     CreateTask(cs, "Consumer") 
    Next 
    ' 
    '========================================= 

除了少数Thread.sleep()方法调用,所有的下一部分不被添加或删除生产者和消费者的任务(线程)。您可以改变顶部的初始值以使其通过步骤。

创建任务...... - 的CreateTask(,<“生产者”或“消费者”>)

要删除一个任务,你(在一行中)都得到一个随机CTS,然后.Cancel ()它:

GetRandomCTS(ProducerCTSs).Cancel() GetRandomCTS(ConsumerCTSs).Cancel()

GetRandomCTS()采用CTS情况的收集,挑选一个随机的,然后调用取消( ) 在上面。

' 
Thread.Sleep(2000) 
' 
' create a producer 
' 
Console.WriteLine("creating producer 555...") 
CreateTask(555, "Producer") 
Thread.Sleep(1000) 
' 
' cancel a consumer 
' 
Console.WriteLine("cancelling random consumer...") 
GetRandomCTS(ConsumerCTSs).Cancel() 
Thread.Sleep(2000) 
' 
' cancel a consumer 
' 
Console.WriteLine("cancelling random consumer...") 
GetRandomCTS(ConsumerCTSs).Cancel() 
Thread.Sleep(1000) 
' 
' create a consumer 
' 
Console.WriteLine("creating consumer 222...") 
CreateTask(222, "consumer") 
Thread.Sleep(1000) 
' 
' cancel a producer 
' 
Console.WriteLine("cancelling random producer...") 
GetRandomCTS(ProducerCTSs).Cancel() 
Thread.Sleep(1000) 
' 
' cancel a consumer 
' 
Console.WriteLine("cancelling random consumer...") 
GetRandomCTS(ConsumerCTSs).Cancel() 
' 
'========================================== 
' 
Console.ReadLine() 

末次

,就是这样!

现在的有趣的部分:7

#Region "Utilites" 
    ''' <summary> 
    ''' Retrieves a random cancellation token source from the given list of current threads... 
    ''' Works for either producer or consumer 
    ''' </summary> 
    ''' <param name="ctsBag">ConcurrentBag(Of CancellationTokenSource)</param> 
    ''' <returns>CancellationTokenSource</returns> 
    ''' <remarks></remarks> 
    Function GetRandomCTS(ctsBag As ConcurrentBag(Of CancellationTokenSource)) As CancellationTokenSource 
     Dim cts As CancellationTokenSource = Nothing 
     Dim rndNum As Random = Nothing 
     Dim rndIndex As Integer = Nothing 
     Try 
      If ctsBag.Count = 1 Then 
       Console.WriteLine("There are no threads to cancel!") 
      Else 
       rndNum = New Random(12345) 
       rndIndex = rndNum.Next(0, ctsBag.Count - 1) ' because ElementAt() is zero-based index 
       cts = ctsBag.ElementAt(rndIndex) 
      End If 
     Catch ex As Exception 
      Console.WriteLine("GetRandomCTS() Exception: " & ex.StackTrace) 
     End Try 
     Return cts 
    End Function 

行:这就是我们将要返回时,一个CancellationTokenSource

第16行:ctsBag.ElementAt()使我们能够拿出一个具体的CTS实例按编号。

下面,CreateTask为您希望它在运行时显示的参数(仅用于演示,查看哪个线程在做什么)以及一个字符串来告诉您是否需要一个新的Consumer的Producer。当然,我可以让它更复杂,但这只是一个粗略的草案。 :)

Private Function CreateTask(taskId As Integer, taskType As String) As CancellationTokenSource 
     Dim t As Task = Nothing 
     Dim cts As New CancellationTokenSource() 
     Dim token As CancellationToken = cts.Token 
     Try 
      If taskType.ToLower = "producer" Then 
       t = Task.Factory.StartNew(Sub() Producer(taskId, token), token, TaskCreationOptions.LongRunning) 
       ProducerCTSs.Add(cts) 
      ElseIf taskType.ToLower = "consumer" Then 
       t = Task.Factory.StartNew(Sub() Consumer(taskId, token), token, TaskCreationOptions.LongRunning) 
       ConsumerCTSs.Add(cts) 
      Else 
      End If 
      Console.WriteLine("{0} Task {1} ({2}) running!", taskType, taskId.ToString("000"), t.Id) 
     Catch ex As Exception 
      Console.WriteLine("Task {0} CreateTask({1}) Exception: ", taskId.ToString("000"), taskType & ex.StackTrace) 
     End Try 
     Return cts 
    End Function 
#End Region 

7号线& 10:这些调用生产者()或客户()班下面,通过他们的CancellationTokenSource需要,让他们能够在不损坏任何数据运行的同时优雅被取消。

T = Task.Factory.StartNew(子()监制(任务id,令牌),令牌,TaskCreationOptions.LongRunning)

你有没有注意到TaskCreationOptions.LongRunning?这对我来说很好,它通过告诉程序不要太担心取消过于紧密会发生什么情况而提高性能。

那么什么是生产者()类是什么样子?

#Region "Producer(s)" 
    Public Sub Producer(ByVal taskNum As Integer, ByVal ct As CancellationToken) 
     ' Was cancellation already requested? 
     If ct.IsCancellationRequested = True Then 
      Console.WriteLine("Producer Task {0} was cancelled before Producer thread created!", taskNum.ToString("000")) 
      ct.ThrowIfCancellationRequested() 
     End If 
     ' 
     'Dim r As Random = New Random(123) 
     Dim sw As New Stopwatch 
     Dim numAdded As Integer = 0 
     sw.Start() 
     While moreItemsToAdd = True 
      ' Dim itemIn As Integer = r.Next(1, 1000) 
      itemId += 1 ' the payload 
      Try 
       bc.Add(itemId) 
       Console.WriteLine("--> " & taskNum.ToString("000") & " --> [+1 => Q has: " & bc.Count & "] added: " & itemId) 
       numAdded += 1 
       If ct.IsCancellationRequested Then 
        Console.WriteLine("Producer Task {0} cancelled", taskNum.ToString("000")) 
        ct.ThrowIfCancellationRequested() 
       End If 
       Thread.Sleep(sleepProducer) 
      Catch ex As OperationCanceledException 
       Console.WriteLine("Task " & taskNum.ToString("000") & " cancelling by request!") 
       Exit While 
      Catch ex As Exception 
       Console.WriteLine("Producer() Exception: " & ex.StackTrace) 
      End Try 
      If bc.Count >= itemsToProduce Then 
       moreItemsToAdd = False 
      End If 
     End While 
     sw.Stop() 
     ' Let consumer know we are done. 
     Console.WriteLine("Producer stopped adding items! Added " & numAdded & " items in " & sw.Elapsed.TotalSeconds & " seconds!") 
     bc.CompleteAdding() 
    End Sub 
#End Region 

我知道,我知道......它看起来很复杂!但事实并非如此。我不是那么聪明! 1/2的代码是刚好赶上&处理取消请求,这样的处理不会损坏任何数据。那是一种俗气的StopWatch()来计时......事实上,早期版本的工件仍然被注释掉了。就像我说的“粗” ......

第17行:只需添加的itemId(我们的有效载荷,可以是任何东西)的BlockingCollection(BC)。

第20行:如果取消了,我们照顾在这里,函数的不是一个随机的部分,这将可能腐败的各种事情...

第31行:我加入这个作为俗气的方式告诉生产者何时停止......生产。该变量(限制)设置在代码的顶部。

第38行:bc.CompleteAdding() - 这是使用BC(BlockingCollection)会有没有更多的项目增加了一个信号给大家。这样,消费者知道何时停止...消费!

“他们为什么要这么做?”

好吧,假设你想短时间运行的任务,或者任务,要知道他们为了继续进行做......是的,在我的情况,他们是长时间运行,并在生产中我会需要使用“TaskCreationOptions”开始每个任务。LongRunning

消费者()类几乎是相同的,只是一些微小的差异:

#Region "Consumer(s)" 
    Public Sub Consumer(ByVal taskNum As Integer, ByVal ct As CancellationToken) 
     If ct.IsCancellationRequested = True Then ' Was cancellation already requested? 
      Console.WriteLine("Consumer Task {0} was cancelled before Consumer thread created!", taskNum.ToString("000")) 
      ct.ThrowIfCancellationRequested() 
     End If 
     Dim totalTaken As Integer = 0 
     Dim sw As New Stopwatch 
     sw.Start() 
     While bc.IsCompleted = False 
      Dim itemOut As Integer = Nothing ' the payload 
      Try 
       itemOut = bc.Take() 
       Console.WriteLine("<-- " & taskNum.ToString("000") & " <-- [-1 => Q has: " & bc.Count & "] took: " & itemOut) 
       If ct.IsCancellationRequested Then 
        Console.WriteLine("Consumer Task {0} cancelled", taskNum.ToString("000")) 
        ct.ThrowIfCancellationRequested() 
       End If 
       totalTaken += 1 
      Catch ex As OperationCanceledException 
       Console.WriteLine("Task " & taskNum.ToString("000") & " cancelling by request!") 
       Exit While 
      Catch e As InvalidOperationException 
       ' IOE means that Take() was called on a completed collection. 
       ' In this example, we can simply catch the exception since the 
       ' loop will break on the next iteration. 
      End Try 
      If (Not itemOut = Nothing) Then 
       Thread.Sleep(sleepConsumer) 
      End If 
     End While 
     sw.Stop() 
     If bc.IsCompleted = True Then 
      Console.WriteLine(vbCrLf & "Task " & taskNum.ToString("000") & " - No more items to take. Took " & totalTaken & " items in " & sw.Elapsed.TotalSeconds & " seconds!") 
     End If 
    End Sub 
#End Region 
End Module 

3号线:在两个班,我们一定要在右上方检查,看看我们我们没有时间或资源,如果另一个任务/线程做了最后一项工作,就像我们正在实例化一样。下一个项目(取决于先前的讨论中配置的FIFO或FILO/LIFO,这个BlockingCollection完成了这一切!

当你坐下来看看它时,这堂课的所有其他代码只是打扮第13行!

所以让我们把这只小狗开火!

Producer Task 001 (1) running! 
--> 001 --> [+1 => Q has: 1] added: 1 
<-- 100 <-- [-1 => Q has: 0] took: 1 
Consumer Task 100 (2) running! 
Consumer Task 101 (3) running! 
Consumer Task 102 (4) running! 
--> 001 --> [+1 => Q has: 1] added: 2 
--> 001 --> [+1 => Q has: 2] added: 3 
--> 001 --> [+1 => Q has: 3] added: 4 
--> 001 --> [+1 => Q has: 4] added: 5 
--> 001 --> [+1 => Q has: 5] added: 6 
--> 001 --> [+1 => Q has: 6] added: 7 
--> 001 --> [+1 => Q has: 7] added: 8 
--> 001 --> [+1 => Q has: 8] added: 9 
--> 001 --> [+1 => Q has: 9] added: 10 
--> 001 --> [+1 => Q has: 10] added: 11 
Producer stopped adding items! Added 11 items in 0.1631605 seconds! 
<-- 101 <-- [-1 => Q has: 9] took: 2 
<-- 100 <-- [-1 => Q has: 8] took: 3 
<-- 101 <-- [-1 => Q has: 7] took: 4 
<-- 102 <-- [-1 => Q has: 6] took: 5 
creating producer 555... 
Producer Task 555 (5) running! 
<-- 100 <-- [-1 => Q has: 5] took: 6 
Producer stopped adding items! Added 0 items in 1.09E-05 seconds! 
<-- 101 <-- [-1 => Q has: 4] took: 7 
<-- 102 <-- [-1 => Q has: 3] took: 8 
cancelling random consumer... 
<-- 100 <-- [-1 => Q has: 2] took: 9 
<-- 101 <-- [-1 => Q has: 1] took: 10 
<-- 102 <-- [-1 => Q has: 0] took: 11 
Consumer Task 102 cancelled 
Task 102 cancelling by request! 

Task 102 - No more items to take. Took 2 items in 2.0128301 seconds! 

Task 100 - No more items to take. Took 4 items in 4.0183264 seconds! 

Task 101 - No more items to take. Took 4 items in 4.0007338 seconds! 
cancelling random consumer... 
creating consumer 222... 

Task 222 - No more items to take. Took 0 items in 2.8E-06 seconds! 
consumer Task 222 (6) running! 
cancelling random producer... 
cancelling random consumer... 

这是您预期的输出吗?

抢7z'd为你,在下面的链接上的整个解决方案......从HERE

下载解决方案!

我花了一段时间才能找出整个的CancellationToken的概念,但现在,我使用它,并且BlockingCollection的防弹的烦躁,我相信我的应用程序可以处理上百个对象每秒不会搞乱任何东西。

我的生产应用程序将读取主机上的核心数量,并使用它来设置使用者的初始数量。然后,我将调整向下,监视完成的时间(以总体方式),从而充分利用主机资源,了解主机可能与我的应用程序同时执行许多其他操作。

谢谢大家!

1

如果预计分析器一直处于忙碌状态(或几乎如此),那么拥有比可用于它们的CPU线程更多的分析器线程是没有意义的。当你只有4个内核时有10个分析器线程是毫无意义的,因为线程上下文切换会有开销。因此,分配3或4个工作线程(消费者)为队列服务。请参阅https://stackoverflow.com/a/2670568/56778以确定系统上的逻辑处理器数拥有更多的工作线程是没有意义的。是否有意义地使用你的动态分配工人的复杂方案是一个意见问题。我更倾向于有一个定时器,每分钟检查一次队列状态(项数),并让它适当地分配或释放工作线程,每分钟添加或删除一个工作者以避免超出标记。这可能会很好地利用可用资源。

删除工人将是非常容易的。如果创建每个线程每次检查其循环的AutoResetEvent,则看到它的第一个线程可以退出。例如:

private AutoResetEvent _killAThread = new AutoResetEvent(false); 

// in each thread 
while (!_killAThread.Wait(0) && !cancelToken.IsCancellationRequested) 
{ 
    // thread waits for an item from the queue and processes it. 
} 

当然,这可能会导致你有太多线程等待,如果队列为空很长一段时间,但我得到你别指望经常发生的条件感(如果有的话)。

而且,当然,添加新消费者很简单。您的计时器滴答处理程序将现有队列大小与某个阈值进行比较,并根据需要调整新线程。或者,如果线程太多,它会调用_killATheread.Set(),下一个完成处理的线程将检查事件,查看它已设置并退出。

+0

你说得对核心:我提到这个在问题(第二段)和答案的最后一段。由于BlockingCollection系列本质上是“阻塞的”,AutoResetEvent逻辑避开了我。我的线程已经在等待一个新的项目,但只有第一个任务才能获取(甚至看到)它。你是否在上面提出我的初步答案没有考虑到的情况?此外,我添加了多个生产者,使其成为实验室/工作台,而不是针对我的具体需求。这是一个完整的测试程序,用于查看多个线程如何影响给定x内核的性能。感谢您的写作! :) –

相关问题