Python's multiprocessing package中队列和管道之间的根本区别是什么?Python多处理 - 管vs队列
在哪种情况下应该选择哪一种?什么时候使用Pipe()
有利?什么时候使用Queue()
有什么好处?
Python's multiprocessing package中队列和管道之间的根本区别是什么?Python多处理 - 管vs队列
在哪种情况下应该选择哪一种?什么时候使用Pipe()
有利?什么时候使用Queue()
有什么好处?
如果需要两个以上的点通信,使用Queue()
何时使用它们
。
如果您需要绝对性能,Pipe()
要快得多,因为Queue()
构建于Pipe()
之上。
性能基准
让我们假设你想产卵两个过程和它们之间快速发送邮件成为可能。这些是使用Pipe()
和Queue()
进行的类似测试之间的拖拽比赛的计时结果...这是在运行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上进行的。作为奖励,我投入了JoinableQueue()
的结果; JoinableQueue()
在调用queue.task_done()
时占用任务(它甚至不知道具体任务,它只是计算队列中未完成的任务),因此queue.join()
知道工作已完成。
每个在这个答案底部的代码...
[email protected]:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
[email protected]:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
[email protected]:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
[email protected]:~$
综上所述Pipe()
比Queue()
快约三倍。除非你真的必须得到好处,否则不要考虑JoinableQueue()
。
花絮2
多重介绍了信息流,使调试很难,除非你知道一些捷径微妙的变化。例如,在许多条件下通过字典进行索引时,您可能有一个脚本可以正常工作,但在某些输入情况下很少失败。
通常,当整个python进程崩溃时,我们会得到失败的线索;但是,如果多处理功能崩溃,则不会收到打印到控制台的未经请求的故障回溯。追踪未知的多重处理崩溃是很难的,而不知道是什么使程序崩溃。
我发现追查多进程崩溃情报的最简单方法是来包装整个多重功能在try
/except
和使用traceback.print_exc()
:
import traceback
def reader(args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
现在,当你发现一个崩溃你看到这样:
FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(task_q, result_q)
File "foo.py", line 46, in run
raise ValueError
ValueError
源代码:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader(pipe):
output_p, input_p = pipe
input_p.close() # We are only reading
while True:
try:
msg = output_p.recv() # Read from the output pipe and do nothing
except EOFError:
break
def writer(count, input_p):
for ii in xrange(0, count):
input_p.send(ii) # Write 'count' numbers into the input pipe
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
output_p, input_p = Pipe()
reader_p = Process(target=reader, args=((output_p, input_p),))
reader_p.start() # Launch the reader process
output_p.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, input_p) # Send a lot of stuff to reader()
input_p.close() # Ask the reader to stop when it reads EOF
reader_p.join()
print "Sending %s numbers to Pipe() took %s seconds" % (count,
(time.time() - _start))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
def reader(queue):
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = Queue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print "Sending %s numbers to Queue() took %s seconds" % (count,
(time.time() - _start))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader(queue):
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = JoinableQueue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
queue.join() # Wait for the reader to finish
print "Sending %s numbers to JoinableQueue() took %s seconds" % (count,
(time.time() - _start))
@Jonathan “总之管道()比快约三倍队列()” –
但管道()不能安全地与多个生产者使用/消费者。 –
非常好!好的答案,很好,你提供的基准!我只有两个小小的狡辩:(1)“数量级更快”有点夸张。差异是x3,大约是一个数量级的三分之一。只是说。 ;-); (2)更公平的比较将运行N个工作人员,每个工作人员通过点对点管道与主线程进行通信,而与运行N个工人的性能相比,这些工作人员都从单点到多点排队。 – JJC