2012-12-13 35 views
1

最n个线程的情景:Python的队列 - 必须在运行

我有一个非常大的数据库模型迁移持续了一个新的版本,和我的工作boilerplating我们将如何去迁移电流的实时数据从webapp进入本地测试数据库。

我想在python中设置一个脚本,它将同时处理我的模型的迁移。我的模型实例有from_legacyto_legacy方法。我到目前为止加载了所有实例,并为每个实例创建了threads,每个线程从核心threading模块中分类,并使用run方法进行转换并保存结果。

我想让程序中的主循环构建一大堆这些线程的实例,并开始一个接一个地处理它们,并且最多只能同时运行10个工作,接下来在其他人完成迁移时进行处理。

我弄不清楚的是如何正确使用队列来做到这一点?如果每个线程都代表完整的迁移任务,我应该先加载所有实例,然后创建一个Queue并将maxsize设置为10,并且只跟踪当前正在运行的队列?也许这样的事情?

currently_running = Queue() 
for model in models: 
    task = Migrate(models) #this is subclassed thread 
    currently_running.put(task) 
    task.start() 

在这种情况下,依靠put调用来阻止它在容量?如果我要走这条路线,我将如何拨打task_done

或者更确切地说,Queue是否应该包含所有的任务(不仅仅是开始的任务)并且使用join来阻止完成?在线程队列上调用join是否会启动包含的线程?

解决“最多有N个正在运行的线程”问题以及队列扮演什么角色的最佳方法是什么?

+0

你不想那样做,因为当正在处理的作业,也不会在等待队列工作。你需要一个“在飞行中”工作的队列,这似乎很愚蠢。只需创建你想要的线程数。没有“包含线程”这样的事情,但只要每个线程在其作业完成之前都不会终止(包括等待任何线程成为其中的一部分),那么只需等待主线程即可。 –

+0

因此,创建10个线程,它们自己被馈送到模型实例,而不是每个线程?如何在每次完成迁移后重新启动它们? – DeaconDesperado

+0

不要重新启动它们。线程只是将队列从队列中拉出,直到队列为空。 –

回答

4

Although not documented,multiprocessing模块有一个ThreadPool类,顾名思义,它创建了一个线程池。它共享same API as the multiprocessing.Pool class

然后,您可以使用pool.apply_async发送任务的线程池:

import multiprocessing.pool as mpool 

def worker(task): 
    # work on task 
    print(task)  # substitute your migration code here. 

# create a pool of 10 threads 
pool = mpool.ThreadPool(10) 
N = 100 

for task in range(N): 
    pool.apply_async(worker, args = (task,)) 

pool.close() 
pool.join() 
+0

这看起来不错!把我完整的任务列表中的'len'替换为'N'可以吗?游泳池将跟踪生活的情况? – DeaconDesperado

+0

另外,arg'task'应该是一个进程/线程,或者只是一个代表该进程的函数? – DeaconDesperado

+0

@DeaconDesperado:是的,您可以使用任务列表的“len”来代替'N'。 “任务”不应该是一个线程。让'pool'协调你的线程。相反,让'task'成为你需要传递给'worker'的任何对象或数据来区分一个迁移任务和另一个迁移任务。考虑到你发布的代码,我认为'task'应该等于'model',但我不清楚'from_legacy'或'to_legacy'在做什么,所以我可能是错的。 – unutbu

0

这可能应该使用信号灯来完成,documentation中的示例暗示了您要完成的任务。

相关问题