2011-06-10 99 views
6

我目前正在研究一个项目,在那里我们有挑战来并行处理项目。到目前为止没有什么大不了的;) 现在到了这个问题。我们有一个ID列表,我们定期(每2秒)为每个ID调用一个StoredProcedure。 需要为每个项目单独检查2秒,因为它们是在运行时添加和删除的。 另外我们要配置最大并行度,因为数据库不应该同时被300个线程充斥。 正在处理的项目不应被重新计划处理,直到完成前一个执行。原因是我们想要防止排队很多项目,以防DB延迟。TPL体系结构问题

现在我们正在使用一个自主开发的组件,它有一个主线程,它定期检查哪些项目需要安排处理。一旦它有了这个列表,它将把它们放在一个自定义的基于IOCP的线程池中,然后使用waithandles等待正在处理的项目。然后下一次迭代开始。 IOCP,因为它提供了工作窃取。

我想用TPL/.NET 4版本替换这个自定义实现,我想知道你将如何解决它(理想情况下简单,很好可读/可维护)。 我知道这篇文章:http://msdn.microsoft.com/en-us/library/ee789351.aspx,但它只是限制正在使用的线程数量。叶工作偷窃,定期执行的项目....

理想情况下,它将成为一个通用的组件,可以用于一些所有需要定期完成的项目列表的任务。

任何输入欢迎, TIA 马丁

+0

反应性编程 – 2011-07-01 21:22:46

回答

9

我不认为你真的需要直接TPL Tasks趴下和肮脏这一点。对于初学者,我会在BlockingCollection上设置一个BlockingCollectionConcurrentQueue(默认值),而不设置BoundedCapacity以存储需要处理的ID。

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service) 
BlockingCollection<string> idsToProcess = new BlockingCollection<string>(); 

从那里我只想从BlockingCollection::GetConsumingEnumerable返回的枚举使用Parallel::ForEach。在ForEach调用中,您将设置您的ParallelOptions::MaxDegreeOfParallelismForEach的正文中,您将执行您的存储过程。

现在,一旦存储过程执行完成,你就说你不想重新计划执行至少两秒钟。没问题,安排System.Threading.Timer回调,它将简单地将ID添加回提供的回调中的BlockingCollection

Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(), 
    new ParallelOptions 
    { 
     MaxDegreeOfParallelism = 4 // read this from config 
    }, 
    (id) => 
    { 
     // ... execute sproc ... 

     // Need to declare/assign this before the delegate so that we can dispose of it inside 
     Timer timer = null; 

     timer = new Timer(
      _ => 
      { 
       // Add the id back to the collection so it will be processed again 
       idsToProcess.Add(id); 

       // Cleanup the timer 
       timer.Dispose(); 
      }, 
      null, // no state, id wee need is "captured" in the anonymous delegate 
      2000, // probably should read this from config 
      Timeout.Infinite); 
    } 

最后,当进程正在关闭,你会打电话BlockingCollection::CompleteAdding从而使枚举正在与停止阻止和完整,并行处理::的ForEach将退出。例如,如果这是Windows服务,您可以在OnStop中执行此操作。

// When ready to shutdown you just signal you're done adding 
idsToProcess.CompleteAdding(); 

更新

你提出你的意见很关注,你可能在任何给定的点来处理大量的ID,并担心会有过多的开销,每ID的计时器。我完全同意这一点。因此,在您同时处理ID的大名单的情况下,我会使用一个计时器,每ID使用另一个队列来保存这是由一个单一的短间隔定时监控,而不是“沉睡”的ID改变。首先,您需要一个ConcurrentQueue在其中放置是睡着的ID:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>(); 

现在,我使用的是两部分Tuple这里用于说明目的,但你可能要创建一个更强类型结构为它(或用using声明至少它的别名)为更好的可读性。元组有ID,当它被放入队列代表一个DateTime。

现在,你还需要设置,将监视此队列中的计时器:

Timer wakeSleepingIdsTimer = new Timer(
    _ => 
    { 
     DateTime utcNow = DateTime.UtcNow; 

     // Pull all items from the sleeping queue that have been there for at least 2 seconds 
     foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2)) 
     { 
      // Add this id back to the processing queue 
      idsToProcess.Enqueue(id); 
     } 
    }, 
    null, // no state 
    Timeout.Infinite, // no due time 
    100 // wake up every 100ms, probably should read this from config 
); 

,那么只需在改变Parallel::ForEach做到以下几点,而不是设置一个计时器为每个:

(id) => 
{ 
     // ... execute sproc ... 

     sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); 
} 
+0

好主意,但你不觉得这会造成一点资源问题吗?我的意思是如果我有列表中的500个元素,我有点担心大额正在运行的定时器... – 2011-06-14 07:27:34

+0

想过,但你并没有给出明确的界限,所以我在等待一个响应,这是否还是没有遇见你需要。您可以轻松地与另一队列和单计时器,监视是由于项目的队列和移动它们拖回的主要工作队列解决这个问题。将添加详细信息到我的答案。 – 2011-06-14 14:27:04

1

这是非常相似的,你说你已经有了你的问题的办法,但TPL任务这样做。一项任务只是将其自身添加回到要安排的事项清单中。

使用锁定一个普通的名单上的是在这个例子相当难看,可能会希望有一个更好的收集保存的事情的清单来安排

// Fill the idsToSchedule 
for (int id = 0; id < 5; id++) 
{ 
    idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id)); 
} 

// LongRunning will tell TPL to create a new thread to run this on 
Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning); 

启动了SchedulingLoop,其实际执行检查是否已经跑了两秒钟

// Tuple of the last time an id was processed and the id of the thing to schedule 
static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>(); 
static int currentlyProcessing = 0; 
const int ProcessingLimit = 3; 

// An event loop that performs the scheduling 
public static void SchedulingLoop() 
{ 
    while (true) 
    { 
     lock (idsToSchedule) 
     { 
      DateTime currentTime = DateTime.Now; 
      for (int index = idsToSchedule.Count - 1; index >= 0; index--) 
      { 
       var scheduleItem = idsToSchedule[index]; 
       var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds; 

       // start it executing in a background task 
       if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit) 
       { 
        Interlocked.Increment(ref currentlyProcessing); 

        Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun); 

        // Schedule this task to be processed 
        Task.Factory.StartNew(() => 
         { 
          Console.WriteLine("Executing {0}", scheduleItem.Item2); 

          // simulate the time taken to call this procedure 
          Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500); 

          lock (idsToSchedule) 
          { 
           idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2)); 
          } 

          Console.WriteLine("Done Executing {0}", scheduleItem.Item2); 
          Interlocked.Decrement(ref currentlyProcessing); 
         }); 

        // remove this from the list of things to schedule 
        idsToSchedule.RemoveAt(index); 
       } 
      } 
     } 

     Thread.Sleep(100); 
    } 
}