2010-07-06 39 views
0

我想用线程来处理流输入。python线程和队列无限数据输入(流)

如何使下面的代码为无限输入产生,例如通过使用itertools.count

下面的代码会工作,如果: “对于i在itertools.count():”被替换为“的我的xrange(5):”

from threading import Thread 
from Queue import Queue, Empty 
import itertools 

def do_work(q): 
    while True: 
    try: 
     x = q.get(block=False) 
     print (x) 
    except Empty: 
     break 

if __name__ == "__main__": 
    work_queue = Queue() 
    for i in itertools.count(): 
    work_queue.put(i) 

    threads = [Thread(target=do_work, args=(work_queue,)) for i in range(8)] 

    for t in threads: t.start() 
    for t in threads: t.join() 
+0

如何让下面的代码做*什么*? – 2010-07-10 14:42:10

回答

2

问题是,itertools.count生成无限序列。这意味着for循环将永远不会结束。你应该把它放在它自己的函数中,并使它成为一个单独的线程。这样,当工作线程从队列中获取数据时,您将拥有队列增长。

+0

你是对的无限循环。我也是如此。但是,当我将itertools放入它自己的线程时,它给了我一个运行时错误。请张贴一些代码。 – Joey 2010-07-06 10:33:09

+0

@Joey:什么错误? – MattH 2010-07-06 10:38:30

+0

运行时错误Visual C++ 此应用程序请求运行时以非常规方式终止它。 – Joey 2010-07-06 10:40:22

1

也许我失去了一些东西,但不是那么简单,创建和for循环之前开始线程?

另外,当你没有工作时,你的线程终止似乎是一个坏主意,因为未来可能会有更多的工作显示出来。当然,你希望他们阻止,直到有些工作可用?

+0

你对'do_work'中的'break'做了一个很好的说明。除非队列缓冲了足够数量的数据,否则工作线程可能会在更多信息放入队列之前全部终止。 – unholysampler 2010-07-06 10:24:55

+0

不是真的为我工作。请你可以发帖代码 – Joey 2010-07-06 10:29:50

+0

@Joey这听起来像一个家庭作业问题。如果是这样,请标记为这样。 – jchl 2010-07-06 10:34:27

1

您需要用线程填充队列。您需要管理队列大小。特别是如果工人们花时间处理物品。您需要标记已完成的队列项目。如果这与你关于twitter和“非常快速”输入的其他问题有关,那么就数据库插入而言,你有更多的工作要做。

对于相当复杂的话题,您的问题太模糊了。即使你想要知道这并不容易,你似乎还不够了解。我建议你对你想要做的事情有一些更具体的了解。

下面是一个填充和使用线程队列的例子。队列大小未被管理。

from threading import Thread 
from Queue import Queue, Empty, Full 
import itertools 
from time import sleep 


def do_work(q,wkr): 
    while True: 
    try: 
     x = q.get(block=True,timeout=10) 
     q.task_done() 
     print "Wkr %s: Consuming %s" % (wkr,x) 
     sleep(0.01) 
    except Empty: 
     print "Wkr %s exiting, timeout/empty" % (wkr) 
     break 
    sleep(0.01) 

def fill_queue(q,limit=1000): 
    count = itertools.count() 
    while True: 
    n = count.next() 
    try: 
     q.put(n,block=True,timeout=10) 
    except Full: 
     print "Filler exiting, timeout/full" 
     break 
    if n >= limit: 
     print "Filler exiting, reached limit - %s" % limit 
     break 
    sleep(0.01) 

work_queue = Queue() 

threads = [Thread(target=do_work, args=(work_queue,i)) for i in range(2)] 
threads.insert(0,Thread(target=fill_queue,args=(work_queue,100))) 

for t in threads: 
    t.start() 

for t in threads: 
    t.join() 

Wkr 0: Consuming 0 
Wkr 1: Consuming 1 
Wkr 0: Consuming 2 
Wkr 1: Consuming 3 
.... 
Wkr 1: Consuming 99 
Filler exiting, reached limit - 100 
Wkr 0: Consuming 100 
Wkr 1 exiting, timeout/empty 
Wkr 0 exiting, timeout/empty