2013-03-17 25 views
0

我一直在试图找出如何解决的要求,我有,但对我的生活我只是不能拿出一个解决方案。去队列中的项目与工作线程

我有一个项目数据库,其中存储他们的一种队列。 (数据库已经实现和其他进程将被添加项目到这个队列。)

该项目需要大量的工作/时间“过程”,所以我需要能够: 不断去排队来自数据库的项目。 对于每个项目运行一个新线程并处理该项目,然后返回true/false它成功处理。 (这将用于重新将其添加到数据库队列中)

但是,只有在当前活动线程数(每个正在处理的项目数)少于最大线程数参数的情况下执行此操作。

一旦最大线程数已经达到我需要停下来解除队列从数据库项目,直到线程的当前数目小于线程的最大数量。 需要继续排队项目。

感觉就像这应该是我能想出但它只是没有向我涌来。

澄清:我只需要实现线程。数据库已经实施。

+1

有几个部分对此,太多地址在一个职位。将任务分解为更小的部分,然后编写代码,然后在遇到较小的部分时发布更具体的问题。 – mbeckish 2013-03-17 01:12:03

+0

如果你的数据库是MSSQL 2005或更新的,我建议寻找Service Broker。 – 2013-03-17 01:19:53

回答

6

一个非常简单的方法是使用Semaphore。您有一个线程可将项目出列并创建线程来处理它们。例如:

const int MaxThreads = 4; 
Semaphore sem = new Semaphore(MaxThreads, MaxThreads); 
while (Queue.HasItems()) 
{ 
    sem.WaitOne(); 
    var item = Queue.Dequeue(); 
    Threadpool.QueueUserWorkItem(ProcessItem, item); // see below 
} 
// When the queue is empty, you have to wait for all processing 
// threads to complete. 
// If you can acquire the semaphore MaxThreads times, all workers are done 
int count = 0; 
while (count < MaxThreads) 
{ 
    sem.WaitOne(); 
    ++count; 
} 

// the code to process an item 
void ProcessItem(object item) 
{ 
    // cast the item to whatever type you need, 
    // and process it. 
    // when done processing, release the semaphore 
    sem.Release(); 
} 

上述技术工作得很好。编码简单,易于理解,非常有效。

一个变化是您可能想要使用Task API而不是Threadpool.QueueUserWorkItemTask可让您更好地控制异步处理,包括取消。在我的例子中我使用了QueueUserWorkItem,因为我更熟悉它。我会在生产程序中使用Task

虽然这确实使用N + 1个线程(其中N是要同时处理的项目数量),但该额外线程通常不会执行任何操作。它运行的唯一时间是将工作分配给工作线程的时间。否则,它会在信号量上进行非忙等待。

0

你只是不知道从哪里开始?

考虑具有最大线程数的线程池。 http://msdn.microsoft.com/en-us/library/y5htx827.aspx

考虑立即旋转起来线程的最大数量和监测数据库。 http://msdn.microsoft.com/en-us/library/system.threading.threadpool.queueuserworkitem.aspx很方便。

请记住,你不能保证你的程序将被安全地结束...崩溃发生。考虑处理状态的记录。

记住,你的选择,并从队列中删除的操作应该是原子的。

0

好了,所以该解决方案的体系结构将依赖于一两件事:确实每队列项的处理时间根据不同项目的数据?

如果没有,那么你可以拥有的东西,处理线程之间仅仅是圆知更鸟。这将很容易实现。

如果处理时间也有所不同,那么你会需要更多的一个“下一个可用”的感觉它的东西,所以无论你的线程恰好是免费的第一被赋予处理数据项的工作。

工作过说出来,然后你将有怎样一个队列读取器和处理线程之间同步的周围通常运行。 'next-available'和'round-robin'之间的区别在于你如何进行同步。

我不是太熟悉C#,但我听说兽告诉叫做后台工作。这很可能是一个可以接受的方法。

对于循环赛,只启动每个队列项的后台工作,在存储阵列中的工人引用。只限于16位正在进步的后台工作人员。这个想法是,你已经开始了16,然后等待第一个完成,直到17号开始,等等。我相信后台工作者实际上是作为线程池上的作业运行的,所以这会自动地限制实际运行的线程数量,以适应​​底层硬件的需求。要等待后台工作人员,请参阅this。等待后台工作人员完成后,您会处理结果并开始另一个工作。

对于下一个可用的方法,它没有那么不同。与其等待第一个完成,您将使用WaitAny()等待任何工作人员完成。你从任何一个完成处理返回,然后开始另一个回到WaitAny()。

这两种方法的一般理念是保持一些线程一直在沸腾。下一个可用方法的特点是,您发出结果的顺序不一定与输入项目的顺序相同。如果那么重要,那么比CPU核心拥有更多后台工作人员的循环方法将会相当高效(线程池将会刚刚开始调试,但还没有运行的工作人员)。然而,延迟会随处理时间而变化。

BTW 16是根据您认为在运行该软件的PC上有多少核心来选择的任意数字。更多的核心,更大的数量。

当然,在看似不安定且不断变化的.NET世界中,现在可能有更好的方法来做到这一点。

祝你好运!