2015-05-08 36 views
3

我想使用2个线程遍历列表。一个来自领先,另一个来自尾随,并在每次迭代中将元素放在Queue中。但在将值放入Queue之前,我需要检查Queue(当其中一个线程将该值放在Queue中时)的值是否存在,所以当发生这种情况时,我需要停止线程并返回遍历值的列表为每个线程。多线程检查队列中的成员并停止线程

这是我到目前为止已经试过:

from Queue import Queue 
from threading import Thread, Event 

class ThreadWithReturnValue(Thread): 
    def __init__(self, group=None, target=None, name=None, 
       args=(), kwargs={}, Verbose=None): 
     Thread.__init__(self, group, target, name, args, kwargs, Verbose) 
     self._return = None 
    def run(self): 
     if self._Thread__target is not None: 
      self._return = self._Thread__target(*self._Thread__args, 
               **self._Thread__kwargs) 
    def join(self): 
     Thread.join(self) 
     return self._return 

main_path = Queue() 

def is_in_queue(x, q): 
    with q.mutex: 
     return x in q.queue 

def a(main_path,g,l=[]): 
    for i in g: 
    l.append(i) 
    print 'a' 
    if is_in_queue(i,main_path): 
     return l 
    main_path.put(i) 

def b(main_path,g,l=[]): 
    for i in g: 
    l.append(i) 
    print 'b' 
    if is_in_queue(i,main_path): 
     return l 
    main_path.put(i) 

g=['a','b','c','d','e','f','g','h','i','j','k','l'] 

t1 = ThreadWithReturnValue(target=a, args=(main_path,g)) 
t2 = ThreadWithReturnValue(target=b, args=(main_path,g[::-1])) 
t2.start() 
t1.start() 
# Wait for all produced items to be consumed 
print main_path.join() 

我用ThreadWithReturnValue,这将创建一个返回值自定义线程。

并在成员检查我用下面的功能:

def is_in_queue(x, q): 
    with q.mutex: 
     return x in q.queue 

现在,如果我第一次启动t1然后t2我会得到12 a然后一个b那么它不会做任何事情,我需要手动终止python!

但是,如果我先运行t2然后t1我会得到以下结果:

b 
b 
b 
b 
ab 

ab 
b 

b 
b 
b 
a 
a 

所以我的问题是,为什么蟒蛇胎面在这种情况下,有什么不同?以及如何终止线程并使它们彼此通信?

+0

看看这里http://pymotw.com/2/multiprocessing/communication.html ...你更感兴趣的是管理共享状态 – OWADVL

+0

@OWADVL声音有用,我会看到的!谢谢! – Kasramvd

+0

您是否有从列表两端进行迭代的实际要求,或者您是否只是将其作为划分任务的一种方式? – 101

回答

2

在我们进入更大的问题,你不使用Queue.join权。

该函数的重点在于,将一堆物品添加到队列中的生产者可以等到消费者或消费者完成所有这些项目的工作。这可以通过让消费者在完成与get之间的每个项目的完成工作后致电task_done。一旦有多少task_done调用作为put调用,队列就完成了。你不是在任何地方做get,更不用说task_done,所以队列无法完成。所以,这就是为什么在两个线程完成后永远阻塞。


这里的第一个问题是你的线程在实际同步之外几乎没有工作。如果他们所做的唯一的事情是通过一个队列进行战斗,那么他们中只有一个能够一次运行。

当然,这是在玩具问题常见,但你必须考虑自己真正的问题:

  • 如果你正在做大量的I/O工作(监听套接字,等待用户输入,等等),线程效果很好。
  • 如果你正在做大量的CPU工作(计算素数),线程不能在Python中工作,因为GIL,但过程。
  • 如果你实际上主要是处理同步单独的任务,那么任何一个都不会很好地工作(并且进程会变得更糟)。它可能仍然是简单以线程方式思考,但它会是做事最慢的方式。你可能想看看协程; Greg Ewing有一个great demonstration关于如何使用yield from来使用协程来构建诸如调度程序或许多参与者模拟的东西。

接下来,正如我提到在你前面的问题,使得线程(或进程)共享状态的有效工作需要持有尽可能短的时间尽可能的锁。

所以,如果你不得不在一个锁下搜索整个队列,那最好是一个常量时间搜索,而不是线性时间搜索。这就是为什么我建议使用诸如OrderedSet配方而不是list的配方,比如stdlib的Queue.Queue中的配方。那么这个功能:

def is_in_queue(x, q): 
    with q.mutex: 
     return x in q.queue 

...只阻塞队列中的一小部分第二,足够长的时间来查找表中的哈希值,而不是足够长,每个元素在队列中对比较x


最后,我试着解释你的其他问题的竞争条件,但让我再试一次。

您需要锁定代码中的每个完整的“事务”,而不是围绕单个操作。

例如,如果你这样做:

with queue locked: 
    see if x is in the queue 
if x was not in the queue: 
    with queue locked: 
     add x to the queue 

...那么它总是可能的,则x不在队列中,当您检查,但在当你解锁并重新锁定它的时间,有人加入它。这正是为什么两个线程都可能提前停下来的原因。

要解决这个问题,你需要把一个锁周围的整个事情:

with queue locked: 
    if x is not in the queue: 
     add x to the queue 

当然这正好直接对我说之前有关锁定队列尽可能短的时间尽可能。真的,这就是多线程技术简单化的原因。编写安全的代码很容易,只要可能是必要的,就可以锁定所有内容,但是随后代码只会使用单个内核,而其他所有线程都会被阻塞,等待锁定。编写快速代码很简单,只需尽可能简短地锁定所有内容,但这样就不安全,并且会得到垃圾数据甚至崩溃。弄清楚需要做什么事情,以及如何最大限度地减少这些事务内部的工作,以及如何处理多个锁,这些锁可能需要在不造成死锁的情况下完成工作......这并不容易。

+0

非常感谢@abarnert花时间和完整的解释!你澄清我的一些小姐的理解,其实我在多处理中很糟糕,我想我需要更多的学习! :) – Kasramvd

+1

@卡斯拉:有一个原因“共享内存多线程很难”是一个老生常谈。每个人都可怕,因为我们的直觉是错误的(至少在从语言级别设计到CPU微码级别的系统上,以优化单一处理并且不能改变)。尽可能使用更高级别的抽象(消息传递,而不是共享内存,STM等),当不是时,您将逐渐感觉到什么时候忽略了您的直觉并严格地完成了工作(和/或测试)会发生什么,但是你仍然会做出难以调试的错误... – abarnert

+0

是的,在我的问题中有很多场景!我正在慢慢的工作,我想深深地学习它!正如你所说的“共享内存多线程很难”是一种陈词滥调,对我来说也是如此,但我喜欢硬件,我会努力! – Kasramvd

2

几件事情,我认为可以改进:

  1. 由于GIL,你可能想使用multiprocessing(而不是threading)模块。通常,CPython线程不会导致CPU密集型工作加速。 (根据问题的背景,multiprocessing也可能不会,但threading几乎肯定不会。)
  2. 像您的is_inqueue这样的函数可能会导致较高的争用。

锁定的时间似乎线性在需要遍历的项目数:

def is_in_queue(x, q): 
    with q.mutex: 
     return x in q.queue 

所以,相反,你可能做到以下几点。

使用multiprocessing与共享dict

from multiprocessing import Process, Manager 

manager = Manager() 
d = manager.dict() 

# Fn definitions and such 

p1 = Process(target=p1, args=(d,)) 
p2 = Process(target=p2, args=(d,)) 
每个函数内

,检查这样的项目:

def p1(d): 

    # Stuff 

    if 'foo' in d: 
     return