2012-05-25 192 views
2

我有两个线程(生产者和消费者),我与Queue共享数据。问题是,当我强行中止生产者时,消费者有时会锁定。Python线程死锁

我在文档中看到用队列取消线程可能会破坏队列并导致死锁。我没有明确获取任何锁,但是阅读Queue.py的源文件时说putget正在这样做。

请问,有谁知道它可能是这样的情况,当我中止线程时,它可能在get/put的中间,即使用锁定然后不释放它?我能做些什么呢?我有时需要过早地终止生产者。将使用进程,而不是线程,有什么区别?

+3

你如何中止线程?按Ctrl + C? –

+1

像这样:http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python –

+0

你可以发布一些代码,所以我们可以知道发生了什么? – betabandido

回答

0

很可能你的死锁是由于没有完成线程。如果你有linux下可以使用注射器从pyrasite打印回溯(你会知道你的程序挂在那里)

如果您正在使用的任何锁在信号处理程序 - 那么很可能这是你的僵局 (这是一个有点复杂,请询问是否需要说明)

创建进程而不是线程肯定会改变情况,但请记住任何数据交换和同步都非常复杂。

+0

如何“完成”一个线程?我认为,当run()达到结束时就完成了。我自己也不使用任何信号处理程序,也许Queue.py呢? http://hg.python.org/cpython/file/2.7/Lib/Queue.py –

+0

是的,如果run()到达结束然后它结束(对于优雅的主线程应该做“join()”) - >你可以添加调试信息,以确保你的线程(生产者/建筑师)结束,因为它可能挂在queue.get()或甚至queue.put()(!) – ddzialak

+0

我认为这个“挂”正是发生了什么。问题是,我放弃了一个线程,所以它不会到达最后 - 我该怎么做?我的意思是,我需要放弃它,但队列似乎挂起..? –

0

也许这将帮助:

import threading 

class MyQueue: 
    def __init__(self): 
     self.tasks = [] 
     self.tlock = threading.Semaphore(0) 
     self.dlock = threading.Lock() 
     self.aborted = False 

    def put(self, arg): 
     try: 
      self.dlock.acquire() 
      self.tasks.append(arg) 
     finally: 
      self.dlock.release() 
      self.tlock.release() 

    def get(self): 
     if self.aborted: 
      return None 
     self.tlock.acquire() 
     if self.aborted: 
      self.tlock.release() 
      return None 
     try: 
      self.dlock.acquire() 
      if self.tasks: 
       return self.tasks.pop() 
      else: # executed abort 
       return None 
     finally: 
      self.dlock.release() 

    def abort(self): 
     self.aborted = True 
     self.tlock.release() 

# TESTING 

mq = MyQueue() 
import sys 

def tlog(line): 
    sys.stdout.write("[ %s ] %s\n" % (threading.currentThread().name, line)) 
    sys.stdout.flush() 

def reader(): 
    arg = 1 
    while arg is not None: 
     tlog("start reading") 
     arg = mq.get() 
     tlog("read: %s" % arg) 
    tlog("END") 

import time, random 
def writer(): 
    try: 
     pos = 1 
     while not mq.aborted: 
      x = random.random() * 5 
      tlog("writer sleep (%s)" % x) 
      pending = x 
      while pending > 0: 
       tosleep = min(0.5, pending) 
       if mq.aborted: 
        return 
       time.sleep(tosleep) 
       pending -= tosleep 

      tlog("write: %s" % x) 
      mq.put("POS %s val=%s" % (pos, x)) 
      pos += 1 
    finally: 
     tlog("writer END") 

def testStart(): 
    try: 
     for i in xrange(9): 
      th = threading.Thread(None, reader, "reader %s" % i,(), {}, None) 
      th.start() 
     for i in xrange(3): 
      th = threading.Thread(None, writer, "writer %s" % i,(), {}, None) 
      th.start() 
     time.sleep(30) # seconds for testing 
    finally: 
     print "main thread: abort()" 
     mq.abort() 

if __name__ == "__main__": 
    testStart() 
+0

非常感谢,但我需要中止线程如上所述,因为任务可能需要很长时间才能完成。 –