[注意:行号是指在原博客文章here上插入蜡笔的Wordpress。在该链接,你也可以下载整个VS 2012的解决方案的7Z]
因此,这是这个想法:
让我们看看我是如何解决阻塞收集问题然后把我们的工作放到解析器任务中......我使用了一种非常通用且易于重用的生产者和消费者方法。事实上,在我的例子中,你甚至可以调整生产者的数量。您可以在下面调整我的代码以更接近您的工作,然后调整线程/任务/消费者的数量,以查看并行性对您的工作有多好。
首先,在进口通常的嫌疑人..
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Module modStartHere
'
' Producer
'
Dim itemsToProduce = 10
Dim sleepProducer = 10 ' in milliseconds
Dim producerStartID = 1
Dim producersNumToStart = 1
Dim ProducerCTSs As New ConcurrentBag(Of CancellationTokenSource)
Dim moreItemsToAdd As Boolean = True
'
' Consumer
'
Dim sleepConsumer = 1000 ' in milliseconds
Dim consumerStartID = 100
Dim consumersNumToStart = 3
Dim ConsumerCTSs As New ConcurrentBag(Of CancellationTokenSource)
生产者(一个或多个)的初始设置与上述。尽管itemsToProduce在程序期间没有改变,但生产者的数量将会减少。作为一个草案,这是非常粗糙的,在某些时候它无疑会在你自己的代码中精简,但是这表明了如何很好地解决这个问题。
我用“IDs”在输出中显示哪个线程正在做什么。他们只生产所需的就是CTS的实例列表:
'
' the multi-thread-safe queue that is produced to and consumed from
'
Dim bc As New BlockingCollection(Of Integer)
'
' this # will be what is actually produced & consumed (1, 2, 3, ...)
'
Dim itemId As Integer = 0
'
这里的主要机器是一个小行:
昏暗BC作为新BlockingCollection(整数)
微软说:
BlockingCollection概述.NET Framework 4。5
BlockingCollection(OF T)是一个线程安全集合类 提供以下功能:
An implementation of the Producer-Consumer pattern.
Concurrent adding and taking of items from multiple threads.
Optional maximum capacity.
Insertion and removal operations that block when collection is empty or full.
Insertion and removal "try" operations that do not block or that block up to a specified period of time.
Encapsulates any collection type that implements IProducerConsumerCollection(Of T)
Cancellation with cancellation tokens.
Two kinds of enumeration with foreach (For Each in Visual Basic):
-Read-only enumeration.
-Enumeration that removes items as they are enumerated.
的itemId只是保持所述假有效载荷的变量。生产者会将其增加1以模拟不同的对象实例或工作单元。您只需更改BlockingCollection保留的类型...
现在我不是以FIFO方式(我将在制作中)执行此操作,但您可以按照Microsoft的方式执行此操作,或者甚至可以执行FILO操作:
当你创建一个BlockingCollection(中T)的对象,您可以指定不仅 的有限容量,而且收集的使用类型。例如,对于 示例,您可以为先进先出(FIFO)行为的第一个 指定一个ConcurrentQueue(Of T)对象,或者为先进先出(LIFO)行为指定一个ConcurrentStack(Of T)对象。
现在这很有用!在这个演示中,我做了所有的事情......但就像我说的,对于我的特定需求,我需要FIFO,如顶部的图中所示...
稍后,您将看到函数和子程序,但这里的真正神奇的是在2集 - 一个生产者和一个消费者:
昏暗ProducerCTSs作为新ConcurrentBag(中CancellationTokenSource) 昏暗ConsumerCTSs作为新ConcurrentBag(中CancellationTokenSource)
The Magic: As each Task(thread) is either created or closed, the corresponding CancellationTokenSource is either added or removed from the appropriate collection above.
说真的,就是这样! :)
下一个代码,初始生产者和消费者创建:
'===============================
'
' start demo
'
Sub Main()
'
'===============================
'
' initial state:
'
' start our producer(s)
'
For ps As Integer = producerStartID To producerStartID + producersNumToStart - 1
CreateTask(ps, "Producer")
Next
'
' start our consumer(s)
'
For cs As Integer = consumerStartID To consumerStartID + consumersNumToStart - 1
CreateTask(cs, "Consumer")
Next
'
'=========================================
除了少数Thread.sleep()方法调用,所有的下一部分不被添加或删除生产者和消费者的任务(线程)。您可以改变顶部的初始值以使其通过步骤。
创建任务...... - 的CreateTask(,<“生产者”或“消费者”>)
要删除一个任务,你(在一行中)都得到一个随机CTS,然后.Cancel ()它:
GetRandomCTS(ProducerCTSs).Cancel() GetRandomCTS(ConsumerCTSs).Cancel()
GetRandomCTS()采用CTS情况的收集,挑选一个随机的,然后调用取消( ) 在上面。
'
Thread.Sleep(2000)
'
' create a producer
'
Console.WriteLine("creating producer 555...")
CreateTask(555, "Producer")
Thread.Sleep(1000)
'
' cancel a consumer
'
Console.WriteLine("cancelling random consumer...")
GetRandomCTS(ConsumerCTSs).Cancel()
Thread.Sleep(2000)
'
' cancel a consumer
'
Console.WriteLine("cancelling random consumer...")
GetRandomCTS(ConsumerCTSs).Cancel()
Thread.Sleep(1000)
'
' create a consumer
'
Console.WriteLine("creating consumer 222...")
CreateTask(222, "consumer")
Thread.Sleep(1000)
'
' cancel a producer
'
Console.WriteLine("cancelling random producer...")
GetRandomCTS(ProducerCTSs).Cancel()
Thread.Sleep(1000)
'
' cancel a consumer
'
Console.WriteLine("cancelling random consumer...")
GetRandomCTS(ConsumerCTSs).Cancel()
'
'==========================================
'
Console.ReadLine()
末次
,就是这样!
现在的有趣的部分:7
#Region "Utilites"
''' <summary>
''' Retrieves a random cancellation token source from the given list of current threads...
''' Works for either producer or consumer
''' </summary>
''' <param name="ctsBag">ConcurrentBag(Of CancellationTokenSource)</param>
''' <returns>CancellationTokenSource</returns>
''' <remarks></remarks>
Function GetRandomCTS(ctsBag As ConcurrentBag(Of CancellationTokenSource)) As CancellationTokenSource
Dim cts As CancellationTokenSource = Nothing
Dim rndNum As Random = Nothing
Dim rndIndex As Integer = Nothing
Try
If ctsBag.Count = 1 Then
Console.WriteLine("There are no threads to cancel!")
Else
rndNum = New Random(12345)
rndIndex = rndNum.Next(0, ctsBag.Count - 1) ' because ElementAt() is zero-based index
cts = ctsBag.ElementAt(rndIndex)
End If
Catch ex As Exception
Console.WriteLine("GetRandomCTS() Exception: " & ex.StackTrace)
End Try
Return cts
End Function
行:这就是我们将要返回时,一个CancellationTokenSource
第16行:ctsBag.ElementAt()使我们能够拿出一个具体的CTS实例按编号。
下面,CreateTask为您希望它在运行时显示的参数(仅用于演示,查看哪个线程在做什么)以及一个字符串来告诉您是否需要一个新的Consumer的Producer。当然,我可以让它更复杂,但这只是一个粗略的草案。 :)
Private Function CreateTask(taskId As Integer, taskType As String) As CancellationTokenSource
Dim t As Task = Nothing
Dim cts As New CancellationTokenSource()
Dim token As CancellationToken = cts.Token
Try
If taskType.ToLower = "producer" Then
t = Task.Factory.StartNew(Sub() Producer(taskId, token), token, TaskCreationOptions.LongRunning)
ProducerCTSs.Add(cts)
ElseIf taskType.ToLower = "consumer" Then
t = Task.Factory.StartNew(Sub() Consumer(taskId, token), token, TaskCreationOptions.LongRunning)
ConsumerCTSs.Add(cts)
Else
End If
Console.WriteLine("{0} Task {1} ({2}) running!", taskType, taskId.ToString("000"), t.Id)
Catch ex As Exception
Console.WriteLine("Task {0} CreateTask({1}) Exception: ", taskId.ToString("000"), taskType & ex.StackTrace)
End Try
Return cts
End Function
#End Region
7号线& 10:这些调用生产者()或客户()班下面,通过他们的CancellationTokenSource需要,让他们能够在不损坏任何数据运行的同时优雅被取消。
T = Task.Factory.StartNew(子()监制(任务id,令牌),令牌,TaskCreationOptions.LongRunning)
你有没有注意到TaskCreationOptions.LongRunning?这对我来说很好,它通过告诉程序不要太担心取消过于紧密会发生什么情况而提高性能。
那么什么是生产者()类是什么样子?
#Region "Producer(s)"
Public Sub Producer(ByVal taskNum As Integer, ByVal ct As CancellationToken)
' Was cancellation already requested?
If ct.IsCancellationRequested = True Then
Console.WriteLine("Producer Task {0} was cancelled before Producer thread created!", taskNum.ToString("000"))
ct.ThrowIfCancellationRequested()
End If
'
'Dim r As Random = New Random(123)
Dim sw As New Stopwatch
Dim numAdded As Integer = 0
sw.Start()
While moreItemsToAdd = True
' Dim itemIn As Integer = r.Next(1, 1000)
itemId += 1 ' the payload
Try
bc.Add(itemId)
Console.WriteLine("--> " & taskNum.ToString("000") & " --> [+1 => Q has: " & bc.Count & "] added: " & itemId)
numAdded += 1
If ct.IsCancellationRequested Then
Console.WriteLine("Producer Task {0} cancelled", taskNum.ToString("000"))
ct.ThrowIfCancellationRequested()
End If
Thread.Sleep(sleepProducer)
Catch ex As OperationCanceledException
Console.WriteLine("Task " & taskNum.ToString("000") & " cancelling by request!")
Exit While
Catch ex As Exception
Console.WriteLine("Producer() Exception: " & ex.StackTrace)
End Try
If bc.Count >= itemsToProduce Then
moreItemsToAdd = False
End If
End While
sw.Stop()
' Let consumer know we are done.
Console.WriteLine("Producer stopped adding items! Added " & numAdded & " items in " & sw.Elapsed.TotalSeconds & " seconds!")
bc.CompleteAdding()
End Sub
#End Region
我知道,我知道......它看起来很复杂!但事实并非如此。我不是那么聪明! 1/2的代码是刚好赶上&处理取消请求,这样的处理不会损坏任何数据。那是一种俗气的StopWatch()来计时......事实上,早期版本的工件仍然被注释掉了。就像我说的“粗” ......
第17行:只需添加的itemId(我们的有效载荷,可以是任何东西)的BlockingCollection(BC)。
第20行:如果取消了,我们照顾在这里,函数的不是一个随机的部分,这将可能腐败的各种事情...
第31行:我加入这个作为俗气的方式告诉生产者何时停止......生产。该变量(限制)设置在代码的顶部。
第38行:bc.CompleteAdding() - 这是使用BC(BlockingCollection)会有没有更多的项目增加了一个信号给大家。这样,消费者知道何时停止...消费!
“他们为什么要这么做?”
好吧,假设你想短时间运行的任务,或者任务,要知道他们为了继续进行做......是的,在我的情况,他们是长时间运行,并在生产中我会需要使用“TaskCreationOptions”开始每个任务。LongRunning“
消费者()类几乎是相同的,只是一些微小的差异:
#Region "Consumer(s)"
Public Sub Consumer(ByVal taskNum As Integer, ByVal ct As CancellationToken)
If ct.IsCancellationRequested = True Then ' Was cancellation already requested?
Console.WriteLine("Consumer Task {0} was cancelled before Consumer thread created!", taskNum.ToString("000"))
ct.ThrowIfCancellationRequested()
End If
Dim totalTaken As Integer = 0
Dim sw As New Stopwatch
sw.Start()
While bc.IsCompleted = False
Dim itemOut As Integer = Nothing ' the payload
Try
itemOut = bc.Take()
Console.WriteLine("<-- " & taskNum.ToString("000") & " <-- [-1 => Q has: " & bc.Count & "] took: " & itemOut)
If ct.IsCancellationRequested Then
Console.WriteLine("Consumer Task {0} cancelled", taskNum.ToString("000"))
ct.ThrowIfCancellationRequested()
End If
totalTaken += 1
Catch ex As OperationCanceledException
Console.WriteLine("Task " & taskNum.ToString("000") & " cancelling by request!")
Exit While
Catch e As InvalidOperationException
' IOE means that Take() was called on a completed collection.
' In this example, we can simply catch the exception since the
' loop will break on the next iteration.
End Try
If (Not itemOut = Nothing) Then
Thread.Sleep(sleepConsumer)
End If
End While
sw.Stop()
If bc.IsCompleted = True Then
Console.WriteLine(vbCrLf & "Task " & taskNum.ToString("000") & " - No more items to take. Took " & totalTaken & " items in " & sw.Elapsed.TotalSeconds & " seconds!")
End If
End Sub
#End Region
End Module
3号线:在两个班,我们一定要在右上方检查,看看我们我们没有时间或资源,如果另一个任务/线程做了最后一项工作,就像我们正在实例化一样。下一个项目(取决于先前的讨论中配置的FIFO或FILO/LIFO,这个BlockingCollection完成了这一切!
当你坐下来看看它时,这堂课的所有其他代码只是打扮第13行!
所以让我们把这只小狗开火!
Producer Task 001 (1) running!
--> 001 --> [+1 => Q has: 1] added: 1
<-- 100 <-- [-1 => Q has: 0] took: 1
Consumer Task 100 (2) running!
Consumer Task 101 (3) running!
Consumer Task 102 (4) running!
--> 001 --> [+1 => Q has: 1] added: 2
--> 001 --> [+1 => Q has: 2] added: 3
--> 001 --> [+1 => Q has: 3] added: 4
--> 001 --> [+1 => Q has: 4] added: 5
--> 001 --> [+1 => Q has: 5] added: 6
--> 001 --> [+1 => Q has: 6] added: 7
--> 001 --> [+1 => Q has: 7] added: 8
--> 001 --> [+1 => Q has: 8] added: 9
--> 001 --> [+1 => Q has: 9] added: 10
--> 001 --> [+1 => Q has: 10] added: 11
Producer stopped adding items! Added 11 items in 0.1631605 seconds!
<-- 101 <-- [-1 => Q has: 9] took: 2
<-- 100 <-- [-1 => Q has: 8] took: 3
<-- 101 <-- [-1 => Q has: 7] took: 4
<-- 102 <-- [-1 => Q has: 6] took: 5
creating producer 555...
Producer Task 555 (5) running!
<-- 100 <-- [-1 => Q has: 5] took: 6
Producer stopped adding items! Added 0 items in 1.09E-05 seconds!
<-- 101 <-- [-1 => Q has: 4] took: 7
<-- 102 <-- [-1 => Q has: 3] took: 8
cancelling random consumer...
<-- 100 <-- [-1 => Q has: 2] took: 9
<-- 101 <-- [-1 => Q has: 1] took: 10
<-- 102 <-- [-1 => Q has: 0] took: 11
Consumer Task 102 cancelled
Task 102 cancelling by request!
Task 102 - No more items to take. Took 2 items in 2.0128301 seconds!
Task 100 - No more items to take. Took 4 items in 4.0183264 seconds!
Task 101 - No more items to take. Took 4 items in 4.0007338 seconds!
cancelling random consumer...
creating consumer 222...
Task 222 - No more items to take. Took 0 items in 2.8E-06 seconds!
consumer Task 222 (6) running!
cancelling random producer...
cancelling random consumer...
这是您预期的输出吗?
抢7z'd为你,在下面的链接上的整个解决方案......从HERE
下载解决方案!
我花了一段时间才能找出整个的CancellationToken的概念,但现在,我使用它,并且BlockingCollection的防弹的烦躁,我相信我的应用程序可以处理上百个对象每秒不会搞乱任何东西。
我的生产应用程序将读取主机上的核心数量,并使用它来设置使用者的初始数量。然后,我将调整向下,监视完成的时间(以总体方式),从而充分利用主机资源,了解主机可能与我的应用程序同时执行许多其他操作。
谢谢大家!
问这个问题是否已经有解决方案的要点是什么? –
@JimMischel [与他人分享](http://blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) – svick
@svick:谢谢。我会记住这一点。 –