我不认为你真的需要直接TPL Tasks
趴下和肮脏这一点。对于初学者,我会在BlockingCollection
上设置一个BlockingCollection
约ConcurrentQueue
(默认值),而不设置BoundedCapacity
以存储需要处理的ID。
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();
从那里我只想从BlockingCollection::GetConsumingEnumerable
返回的枚举使用Parallel::ForEach
。在ForEach
调用中,您将设置您的ParallelOptions::MaxDegreeOfParallelism
在ForEach
的正文中,您将执行您的存储过程。
现在,一旦存储过程执行完成,你就说你不想重新计划执行至少两秒钟。没问题,安排System.Threading.Timer
回调,它将简单地将ID添加回提供的回调中的BlockingCollection
。
Parallel.ForEach(
idsToProcess.GetConsumingEnumerable(),
new ParallelOptions
{
MaxDegreeOfParallelism = 4 // read this from config
},
(id) =>
{
// ... execute sproc ...
// Need to declare/assign this before the delegate so that we can dispose of it inside
Timer timer = null;
timer = new Timer(
_ =>
{
// Add the id back to the collection so it will be processed again
idsToProcess.Add(id);
// Cleanup the timer
timer.Dispose();
},
null, // no state, id wee need is "captured" in the anonymous delegate
2000, // probably should read this from config
Timeout.Infinite);
}
最后,当进程正在关闭,你会打电话BlockingCollection::CompleteAdding
从而使枚举正在与停止阻止和完整,并行处理::的ForEach将退出。例如,如果这是Windows服务,您可以在OnStop
中执行此操作。
// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();
更新
你提出你的意见很关注,你可能在任何给定的点来处理大量的ID,并担心会有过多的开销,每ID的计时器。我完全同意这一点。因此,在您同时处理ID的大名单的情况下,我会使用一个计时器,每ID使用另一个队列来保存这是由一个单一的短间隔定时监控,而不是“沉睡”的ID改变。首先,您需要一个ConcurrentQueue
在其中放置是睡着的ID:
ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();
现在,我使用的是两部分Tuple
这里用于说明目的,但你可能要创建一个更强类型结构为它(或用using
声明至少它的别名)为更好的可读性。元组有ID,当它被放入队列代表一个DateTime。
现在,你还需要设置,将监视此队列中的计时器:
Timer wakeSleepingIdsTimer = new Timer(
_ =>
{
DateTime utcNow = DateTime.UtcNow;
// Pull all items from the sleeping queue that have been there for at least 2 seconds
foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
{
// Add this id back to the processing queue
idsToProcess.Enqueue(id);
}
},
null, // no state
Timeout.Infinite, // no due time
100 // wake up every 100ms, probably should read this from config
);
,那么只需在改变Parallel::ForEach
做到以下几点,而不是设置一个计时器为每个:
(id) =>
{
// ... execute sproc ...
sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow));
}
反应性编程 – 2011-07-01 21:22:46