2009-11-24 27 views
8

我是新来的蟒蛇和threading - 做一些音乐文件转换,并希望能够利用我的机器上的多个核心(每个核心一个活动转换线程)的一些进展。如何限制python中活动线程的数量?

class EncodeThread(threading.Thread): 
    # this is hacked together a bit, but should give you an idea 
    def run(self): 
     decode = subprocess.Popen(["flac","--decode","--stdout",self.src], 
          stdout=subprocess.PIPE) 
     encode = subprocess.Popen(["lame","--quiet","-",self.dest], 
           stdin=decode.stdout) 
     encode.communicate() 

# some other code puts these threads with various src/dest pairs in a list 

for proc in threads: # `threads` is my list of `threading.Thread` objects 
    proc.start() 

一切正常,所有的文件得到编码,bravo! ...然而,所有的进程立即产卵,但我只想一次运行两个(每个核心一个)。一旦完成,我希望它移动到列表中的下一个,直到完成,然后继续执行程序。

我该怎么做?

(我已经看过了线程池和队列功能,但我无法找到一个简单的答案。)

编辑:也许我应该补充一点,我的每个线程的使用subprocess.Popen运行单独的命令行解码器(flac)通过管道输入命令行编码器(lame/mp3)。

+0

为什么要麻烦?让你的线程相互竞争有什么问题?让每个核心彻底饱和工作会更快。 – 2009-11-24 02:34:43

+0

好吧,我想我没有这样想过......拥有超过2000个文件的音乐库,我认为同时产生(同时)2,000个解码过程(flac)到2,000个编码过程(跛脚)将是次优的。我错了吗? – thornomad 2009-11-24 02:39:53

+0

@thornomad:是的,你错了。由于您拥有2个内核,因此仅限于2个进程是错误的。一个过程不会让工作核心饱和。即使是由三部分组成的流程管线也可能具有足够的I/O,以致内核未被完全占用。 – 2009-11-24 03:08:25

回答

4

“我的每个线程都使用subprocess.Popen来运行单独的命令行[process]”。

为什么有一堆线程管理一堆进程?这正是操作系统为你做的。为什么微管理操作系统已经管理的内容?

与其监督进程的线程无关,只是分离进程。你的进程表可能无法处理2000个进程,但它可以很容易地处理几十个(也许几百个)进程。

你想有更多工作比你的CPU可以处理排队。真正的问题是内存之一 - 而不是进程或线程。如果所有进程的所有活动数据的总和超过物理内存,则必须交换数据,这会降低速度。

如果你的进程有一个相当小的内存占用,你可以有很多很多的运行。如果你的进程有很大的内存占用,你不能有很多的运行。

+0

heh。我现在看到了我被黑了一起的方法 - 这有点多余。那么,是否有一种子进程管理“池”的方式(正如其他人所建议的那样)。感谢您的输入。随着我的学习......只是使用'subprocess.poll()'来查看所做的事情以及仍在运行的事情?再次感谢。 – thornomad 2009-11-24 03:14:03

+0

正确。你可以使用一组简单的过程;删除完成的。添加一个,并将该集合的大小保持在某个限制之下。这只是一个具有'add'和'remove'的集合。 – 2009-11-24 04:01:26

1

如果您使用的是默认的“cpython”版本,那么这不会对您有所帮助,因为一次只能执行一个线程;看起来Global Interpreter Lock。相反,我建议在Python 2.6中查看multiprocessingmodule - 它使得并行编程变得简单。您可以使用2*num_threads进程创建一个Pool对象,并为其提供一些任务。它将一次执行最多2*num_threads个任务,直到完成所有任务。

在工作中,我最近迁移了一堆Python XML工具(一个不同的xpath grepper和bulk xslt转换器)来使用这个工具,并且每个处理器有两个进程有非常好的结果。

+1

如果你的子进程将执行函数在你的Python代码中,多处理模块非常棒。如果你正在调用外部程序,那么这个模块不会提供优于子进程模块的优势...因为这些外部程序不会有任何方法将结果返回给临时文件或管道以外的父级。多处理模块的巨大IPC优势在您执行的外部程序中丢失。 (例如,让多进程调用子进程中的每个进程听起来非常愚蠢)。 – 2009-11-25 04:46:55

0

我不是这方面的专家,但我读了一些关于“锁”的内容。 This article可能会帮助你

希望这有助于

1

它看起来对我说,你想要的是某种形式的游泳池,并在泳池您希望有n个线程,其中n ==处理器数量在你的系统上。然后,您将拥有另一个线程,其唯一的工作是将作业提交给队列,工作线程可以在队列中自由选择并处理它们(因此对于双代码机器,您将有三个线程,但主线程会做很少)。

因为你是Python的新手,虽然我会假设你不知道GIL,它是关于线程的副作用。如果您阅读我关联的文章,您很快就会明白,为什么传统的多线程解决方案在Python世界中并不总是最好的。相反,你应该考虑使用multiprocessing模块(Python 2.6中的新功能,在2.5中可以使用use this backport)来实现相同的效果。它通过使用多个进程来解决GIL的问题,就好像它们是同一应用程序中的线程一样。对于如何共享数据(您正在不同的内存空间中工作)有一些限制,但实际上这并非坏事:它们只是鼓励良好实践,如最小化线程之间的接触点(或本例中的进程)。

在你的情况下,你可能会使用指定here的泳池。

+0

谢谢 - 我会看看多进程......我编辑了我的问题以获得更多详细信息......看起来,subprocess.Popen确实有点中断并做自己的事情。 – thornomad 2009-11-24 02:50:57

+0

多处理模块BTW对于2.6(来自支持2.4和2.5的pyprocessing第三方模块)来说是一个很好的补充。 但是,它不适合运行外部程序。多处理模块的主要优点在于它在线程支持后建模。您可以创建Queue()作为主要的内部(线程/进程)通信机制,以消除您自己显式锁定的大部分需求。 (Queue()为任意对象的多个生产者和消费者提供一致的支持)。如果孩子们运行Python代码很好。 – 2009-11-25 04:41:02

1

简答:不要使用线程。

对于一个工作的例子,你可以看看我最近在工作中扔在一起的东西。这是一个围绕ssh的小包装,它运行一个可配置数量的Popen()子过程。我已将它发布在:Bitbucket: classh (Cluster Admin's ssh Wrapper)

如上所述,我不使用线程;我刚从孩子身上产生,循环他们呼叫他们的.poll()方法并检查超时(也可配置)并在收集结果时补充池。我已经玩过不同的sleep()值,并且在过去我已经编写了一个版本(在子进程模块被添加到Python之前),其中使用了信号模块(SIGCHLD和SIGALRM)和os.fork()os.execve()函数---我的管道和文件描述符管道等)。

在我的情况下,我逐渐打印结果,因为我收集它们...并记住所有这些结果总结(当所有作业已完成或因超时超时而死亡时)。

我在25,000个内部主机(其中许多已关闭,退休,位于国际上,无法访问我的测试帐户等)上发布此信息。它在两个多小时内完成了工作,没有任何问题。 (其中大约60个是由于系统处于退化/颠簸状态而超时 - 证明我的超时处理工作正常)。

所以我知道这个模型可靠地工作。使用此代码运行100个当前的ssh进程似乎不会引起任何明显的影响。 (这是一个适中的FreeBSD盒子)。我曾经运行旧的(前子进程)版本,我的旧512MB笔记本电脑上有100个并发进程,没有问题。 (顺便说一下:我打算清理它并为其添加功能;随意贡献或克隆自己的分支;这就是Bitbucket.org的用处)。

+0

谢谢 - 我今天会更仔细地看一下。我很快就想到了一个非常简单的while循环,它似乎只是检查'p.communicate()'方法。 (PS:我认为你在源代码的第4行丢失了一个关闭''''')。 – thornomad 2009-11-24 12:34:46

30

如果要限制并行的线程数,使用semaphore

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads) 

class EncodeThread(threading.Thread): 

    def run(self): 
     threadLimiter.acquire() 
     try: 
      <your code here> 
     finally: 
      threadLimiter.release() 

启动所有线程一次。除maximumNumberOfThreads之外的所有内容都将在threadLimiter.acquire()中等待,等待的线程只会在另一个线程通过threadLimiter.release()后才会继续。

+1

这完全回答了最初的问题。非常适合那些最终在Google上搜索的用户。 – 2013-06-22 03:10:26