2016-10-01 68 views
2

我想通过队列使一个分析器多线程。它似乎工作,但我的队列挂起。如果有人能告诉我如何解决这个问题,我会很感激,因为我很少写多线程代码。Python中的线程队列挂起

此代码从Q写着:

from silk import * 
import json 
import datetime 
import pandas 
import Queue 
from threading import Thread 

l = [] 
q = Queue.Queue() 

def parse_record(): 
    d = {} 
    while not q.empty(): 
     rec = q.get() 
     d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S") 
     # ... many ops like this 
     d['dport'] = rec.dport 
     l.append(d) # l is global 

这充满问:

def parse_records(): 
    ffile = '/tmp/query.rwf' 
    flows = SilkFile(ffile, READ) 
    numthreads = 2 

    # fill queue 
    for rec in flows: 
     q.put(rec) 
    # work on Queue  
    for i in range(numthreads): 
     t = Thread(target = parse_record) 
     t.daemon = True 
     t.start() 

    # blocking 
    q.join() 

    # never reached  
    data_df = pandas.DataFrame.from_records(l) 
    return data_df 

我只在我的主打电话parse_records()。它永远不会终止。

+1

作为一个方面说明,线程化可能会使其运行速度变慢。 python GIL只允许一次运行一个线程。 CPU绑定的工作人员不会并行运行。 – tdelaney

回答

2

Queue.empty doc说:

...如果是空的()返回FALSE,它并不能保证后续调用get()不会阻止。

至少应该使用get_nowait或风险数据丢失。但更重要的是,加入只会释放时,所有排队的项目都标记为已完成了Queue.task_done电话:

如果join()方法是目前拦截,当所有项目已处理完毕,就会恢复(意思每个已放入队列的项目都会收到一个task_done()调用。

作为附注,l.append(d)不是原子的,应该用锁保护。

from silk import * 
import json 
import datetime 
import pandas 
import Queue 
from threading import Thread, Lock 

l = [] 
l_lock = Lock() 
q = Queue.Queue() 

def parse_record(): 
    d = {} 
    while 1: 
     try: 
      rec = q.getnowait() 
      d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S") 
      # ... many ops like this 
      d['dport'] = rec.dport 
      with l_lock(): 
       l.append(d) # l is global 
      q.task_done() 
     except Queue.Empty: 
      return 

通过使用标准库中的线程池,可以大大缩短代码。

from silk import * 
import json 
import datetime 
import pandas 
import multiprocessing.pool 

def parse_record(rec): 
    d = {} 
    d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S") 
    # ... many ops like this 
    d['dport'] = rec.dport 
    return d 

def parse_records(): 
    ffile = '/tmp/query.rwf' 
    flows = SilkFile(ffile, READ) 
    pool = multiprocessing.pool.Pool(2) 
    data_df = pandas.DataFrame.from_records(pool.map(parse_record), flows) 
    pool.close() 
    return data_df 
+0

正确...这比较慢。但是我在Python中学到了很多关于多线程的知识。非常感谢你。 – wishi