2014-10-04 281 views
1

当使用multiprocessing.JoinableQueue产生进程的线程时,我得到BrokenPipeError。这似乎发生在程序完成工作并尝试退出之后,因为它确实是应该做的。这是什么意思,有没有办法解决这个/安全忽略?Python多线程+多处理BrokenPipeError(子进程不退出?)

import requests 
import multiprocessing 
from multiprocessing import JoinableQueue 
from queue import Queue 
import threading 


class ProcessClass(multiprocessing.Process): 
    def __init__(self, func, in_queue, out_queue): 
     super().__init__() 
     self.in_queue = in_queue 
     self.out_queue = out_queue 
     self.func = func 

    def run(self): 
     while True: 
      arg = self.in_queue.get() 
      self.func(arg, self.out_queue) 
      self.in_queue.task_done() 


class ThreadClass(threading.Thread): 
    def __init__(self, func, in_queue, out_queue): 
     super().__init__() 
     self.in_queue = in_queue 
     self.out_queue = out_queue 
     self.func = func 

    def run(self): 
     while True: 
      arg = self.in_queue.get() 
      self.func(arg, self.out_queue) 
      self.in_queue.task_done() 


def get_urls(host, out_queue): 
    r = requests.get(host) 
    out_queue.put(r.text) 
    print(r.status_code, host) 


def get_title(text, out_queue): 
    print(text.strip('\r\n ')[:5]) 


if __name__ == '__main__': 
    def test(): 

     q1 = JoinableQueue() 
     q2 = JoinableQueue() 

     for i in range(2): 
      t = ThreadClass(get_urls, q1, q2) 
      t.daemon = True 
      t.setDaemon(True) 
      t.start() 

     for i in range(2): 
      t = ProcessClass(get_title, q2, None) 
      t.daemon = True 
      t.start() 

     for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",): 
      q1.put(host) 

     q1.join() 
     q2.join() 

    test() 
    print('Finished') 

程序输出:

200 http://ibm.com 
<!DOC 
200 http://google.com 
<!doc 
200 http://yahoo.com 
<!DOC 
200 http://apple.com 
<!DOC 
200 http://amazon.com 
<!DOC 
Finished 
Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "C:\Python\33\lib\multiprocessing\connection.py", line 313, in _recv_bytes 
    nread, err = ov.GetOverlappedResult(True) 
BrokenPipeError: [WinError 109] 

The pipe has been ended 

During handling of the above exception, another exception occurred: 

Traceback (most recent call last): 
    File "C:\Python\33\lib\threading.py", line 901, in _bootstrap_inner 
    self.run() 
    File "D:\Progs\Uspat\uspat\spider\run\threads_test.py", line 31, in run 
    arg = self.in_queue.get() 
    File "C:\Python\33\lib\multiprocessing\queues.py", line 94, in get 
    res = self._recv() 
    File "C:\Python\33\lib\multiprocessing\connection.py", line 251, in recv 
    buf = self._recv_bytes() 
    File "C:\Python\33\lib\multiprocessing\connection.py", line 322, in _recv_bytes 
    raise EOFError 
EOFError 
.... 

(切成同样的错误其他线程。)

如果我切换到JoinableQueue为queue.Queue多线程的一部分,一切修复,但为什么呢?

回答

2

发生这种情况,因为你要离开了后台线程在multiprocessing.Queue.get调用主线程退出时阻塞,但它只能在一定的条件下发生的:

  1. 守护线程运行和阻塞上multiprocessing.Queue.get当主线程退出时。
  2. A multiprocessing.Process正在运行。
  3. multiprocessing上下文不是'fork'

唯一的例外是告诉你的是,Connection的另一端,该multiprocessing.JoinableQueue是听时,其一个get()调用内发送的EOF。通常这意味着Connection的另一端已关闭。在关闭期间发生这种情况是有道理的 - 在退出解释器之前,Python正在清理所有对象,部分清理操作包括关闭所有打开的Connection对象。我还没有弄清楚的是,为什么只有(并且总是)如果产生了multiprocessing.Process而不是分叉,这就是为什么默认情况下它不会在Linux上发生)并且仍在运行。我甚至可以重现它,如果我创建一个multiprocessing.Process,只是睡在while循环。它根本不需要任何Queue对象。无论出于何种原因,运行中产生的子进程似乎都能保证异常会被提出。它可能只是简单地导致事情被破坏的顺序恰好适合竞争条件的发生,但这是一种猜测。

在任何情况下,使用的queue.Queue,而不是multiprocessing.JoinableQueue是修复的好方法,因为你实际上并不需要multiprocessing.Queue那里。您还可以通过发送标记到队列中来确保后台线程和/或后台进程在主线程之前关闭。因此,使双方run方法检查前哨:

def run(self): 
    for arg in iter(self.in_queue.get, None): # None is the sentinel 
     self.func(arg, self.out_queue) 
     self.in_queue.task_done() 
    self.in_queue.task_done() 

,然后发送哨兵当你做:

threads = [] 
    for i in range(2): 
     t = ThreadClass(get_urls, q1, q2) 
     t.daemon = True 
     t.setDaemon(True) 
     t.start() 
     threads.append(t) 

    p = multiprocessing.Process(target=blah) 
    p.daemon = True 
    p.start() 
    procs = [] 
    for i in range(2): 
     t = ProcessClass(get_title, q2, None) 
     t.daemon = True 
     t.start() 
     procs.append(t) 

    for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",): 
     q1.put(host) 

    q1.join() 
    # All items have been consumed from input queue, lets start shutting down. 
    for t in procs: 
     q2.put(None) 
     t.join() 
    for t in threads: 
     q1.put(None) 
     t.join() 
    q2.join() 
+0

谢谢你一个完整的答案(一个侧面说明,或许有帮助的人:我想使用multiprocessing.JoinableQueue而不是queue.Queue也能够将来自多处理部分的参数传递回应用程序的多线程部分,尽管在上例中没有这样的代码) – Bob 2014-10-04 06:35:56