2011-12-11 93 views

回答

180
  • A Pipe()只能有两个端点。

  • A Queue()可以有多个生产者和消费者。

如果需要两个以上的点通信,使用Queue()何时使用它们

如果您需要绝对性能,Pipe()要快得多,因为Queue()构建于Pipe()之上。

性能基准

让我们假设你想产卵两个过程和它们之间快速发送邮件成为可能。这些是使用Pipe()Queue()进行的类似测试之间的拖拽比赛的计时结果...这是在运行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上进行的。作为奖励,我投入了JoinableQueue()的结果; JoinableQueue()在调用queue.task_done()时占用任务(它甚至不知道具体任务,它只是计算队列中未完成的任务),因此queue.join()知道工作已完成。

每个在这个答案底部的代码...

[email protected]:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds 
Sending 100000 numbers to Pipe() took 0.328398942947 seconds 
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds 
[email protected]:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds 
Sending 100000 numbers to Queue() took 0.980564117432 seconds 
Sending 1000000 numbers to Queue() took 10.1611330509 seconds 
[email protected]:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds 
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds 
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds 
[email protected]:~$ 

综上所述Pipe()Queue()快约三倍。除非你真的必须得到好处,否则不要考虑JoinableQueue()

花絮2

多重介绍了信息流,使调试很难,除非你知道一些捷径微妙的变化。例如,在许多条件下通过字典进行索引时,您可能有一个脚本可以正常工作,但在某些输入情况下很少失败。

通常,当整个python进程崩溃时,我们会得到失败的线索;但是,如果多处理功能崩溃,则不会收到打印到控制台的未经请求的故障回溯。追踪未知的多重处理崩溃是很难的,而不知道是什么使程序崩溃。

我发现追查多进程崩溃情报的最简单方法是来包装整个多重功能在try/except和使用traceback.print_exc()

import traceback 
def reader(args): 
    try: 
     # Insert stuff to be multiprocessed here 
     return args[0]['that'] 
    except: 
     print "FATAL: reader({0}) exited while multiprocessing".format(args) 
     traceback.print_exc() 

现在,当你发现一个崩溃你看到这样:

FATAL: reader([{'crash', 'this'}]) exited while multiprocessing 
Traceback (most recent call last): 
    File "foo.py", line 19, in __init__ 
    self.run(task_q, result_q) 
    File "foo.py", line 46, in run 
    raise ValueError 
ValueError 

源代码:


""" 
multi_pipe.py 
""" 
from multiprocessing import Process, Pipe 
import time 

def reader(pipe): 
    output_p, input_p = pipe 
    input_p.close() # We are only reading 
    while True: 
     try: 
      msg = output_p.recv() # Read from the output pipe and do nothing 
     except EOFError: 
      break 

def writer(count, input_p): 
    for ii in xrange(0, count): 
     input_p.send(ii)    # Write 'count' numbers into the input pipe 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     output_p, input_p = Pipe() 
     reader_p = Process(target=reader, args=((output_p, input_p),)) 
     reader_p.start()  # Launch the reader process 

     output_p.close()  # We no longer need this part of the Pipe() 
     _start = time.time() 
     writer(count, input_p) # Send a lot of stuff to reader() 
     input_p.close()  # Ask the reader to stop when it reads EOF 
     reader_p.join() 
     print "Sending %s numbers to Pipe() took %s seconds" % (count, 
      (time.time() - _start)) 

""" 
multi_queue.py 
""" 
from multiprocessing import Process, Queue 
import time 

def reader(queue): 
    while True: 
     msg = queue.get()   # Read from the queue and do nothing 
     if (msg == 'DONE'): 
      break 

def writer(count, queue): 
    for ii in xrange(0, count): 
     queue.put(ii)    # Write 'count' numbers into the queue 
    queue.put('DONE') 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     queue = Queue() # reader() reads from queue 
          # writer() writes to queue 
     reader_p = Process(target=reader, args=((queue),)) 
     reader_p.daemon = True 
     reader_p.start()  # Launch the reader process 

     _start = time.time() 
     writer(count, queue) # Send a lot of stuff to reader() 
     reader_p.join()   # Wait for the reader to finish 
     print "Sending %s numbers to Queue() took %s seconds" % (count, 
      (time.time() - _start)) 

""" 
multi_joinablequeue.py 
""" 
from multiprocessing import Process, JoinableQueue 
import time 

def reader(queue): 
    while True: 
     msg = queue.get()   # Read from the queue and do nothing 
     queue.task_done() 

def writer(count, queue): 
    for ii in xrange(0, count): 
     queue.put(ii)    # Write 'count' numbers into the queue 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     queue = JoinableQueue() # reader() reads from queue 
            # writer() writes to queue 
     reader_p = Process(target=reader, args=((queue),)) 
     reader_p.daemon = True 
     reader_p.start()  # Launch the reader process 

     _start = time.time() 
     writer(count, queue) # Send a lot of stuff to reader() 
     queue.join()   # Wait for the reader to finish 
     print "Sending %s numbers to JoinableQueue() took %s seconds" % (count, 
      (time.time() - _start)) 
+2

@Jonathan “总之管道()比快约三倍队列()” –

+0

但管道()不能安全地与多个生产者使用/消费者。 –

+11

非常好!好的答案,很好,你提供的基准!我只有两个小小的狡辩:(1)“数量级更快”有点夸张。差异是x3,大约是一个数量级的三分之一。只是说。 ;-); (2)更公平的比较将运行N个工作人员,每个工作人员通过点对点管道与主线程进行通信,而与运行N个工人的性能相比,这些工作人员都从单点到多点排队。 – JJC