我有两个线程(生产者和消费者),我与Queue
共享数据。问题是,当我强行中止生产者时,消费者有时会锁定。Python线程死锁
我在文档中看到用队列取消线程可能会破坏队列并导致死锁。我没有明确获取任何锁,但是阅读Queue.py的源文件时说put
和get
正在这样做。
请问,有谁知道它可能是这样的情况,当我中止线程时,它可能在get
/put
的中间,即使用锁定然后不释放它?我能做些什么呢?我有时需要过早地终止生产者。将使用进程,而不是线程,有什么区别?
我有两个线程(生产者和消费者),我与Queue
共享数据。问题是,当我强行中止生产者时,消费者有时会锁定。Python线程死锁
我在文档中看到用队列取消线程可能会破坏队列并导致死锁。我没有明确获取任何锁,但是阅读Queue.py的源文件时说put
和get
正在这样做。
请问,有谁知道它可能是这样的情况,当我中止线程时,它可能在get
/put
的中间,即使用锁定然后不释放它?我能做些什么呢?我有时需要过早地终止生产者。将使用进程,而不是线程,有什么区别?
很可能你的死锁是由于没有完成线程。如果你有linux下可以使用注射器从pyrasite打印回溯(你会知道你的程序挂在那里)
如果您正在使用的任何锁在信号处理程序 - 那么很可能这是你的僵局 (这是一个有点复杂,请询问是否需要说明)
创建进程而不是线程肯定会改变情况,但请记住任何数据交换和同步都非常复杂。
如何“完成”一个线程?我认为,当run()达到结束时就完成了。我自己也不使用任何信号处理程序,也许Queue.py呢? http://hg.python.org/cpython/file/2.7/Lib/Queue.py –
是的,如果run()到达结束然后它结束(对于优雅的主线程应该做“join()”) - >你可以添加调试信息,以确保你的线程(生产者/建筑师)结束,因为它可能挂在queue.get()或甚至queue.put()(!) – ddzialak
我认为这个“挂”正是发生了什么。问题是,我放弃了一个线程,所以它不会到达最后 - 我该怎么做?我的意思是,我需要放弃它,但队列似乎挂起..? –
也许这将帮助:
import threading
class MyQueue:
def __init__(self):
self.tasks = []
self.tlock = threading.Semaphore(0)
self.dlock = threading.Lock()
self.aborted = False
def put(self, arg):
try:
self.dlock.acquire()
self.tasks.append(arg)
finally:
self.dlock.release()
self.tlock.release()
def get(self):
if self.aborted:
return None
self.tlock.acquire()
if self.aborted:
self.tlock.release()
return None
try:
self.dlock.acquire()
if self.tasks:
return self.tasks.pop()
else: # executed abort
return None
finally:
self.dlock.release()
def abort(self):
self.aborted = True
self.tlock.release()
# TESTING
mq = MyQueue()
import sys
def tlog(line):
sys.stdout.write("[ %s ] %s\n" % (threading.currentThread().name, line))
sys.stdout.flush()
def reader():
arg = 1
while arg is not None:
tlog("start reading")
arg = mq.get()
tlog("read: %s" % arg)
tlog("END")
import time, random
def writer():
try:
pos = 1
while not mq.aborted:
x = random.random() * 5
tlog("writer sleep (%s)" % x)
pending = x
while pending > 0:
tosleep = min(0.5, pending)
if mq.aborted:
return
time.sleep(tosleep)
pending -= tosleep
tlog("write: %s" % x)
mq.put("POS %s val=%s" % (pos, x))
pos += 1
finally:
tlog("writer END")
def testStart():
try:
for i in xrange(9):
th = threading.Thread(None, reader, "reader %s" % i,(), {}, None)
th.start()
for i in xrange(3):
th = threading.Thread(None, writer, "writer %s" % i,(), {}, None)
th.start()
time.sleep(30) # seconds for testing
finally:
print "main thread: abort()"
mq.abort()
if __name__ == "__main__":
testStart()
非常感谢,但我需要中止线程如上所述,因为任务可能需要很长时间才能完成。 –
你如何中止线程?按Ctrl + C? –
像这样:http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python –
你可以发布一些代码,所以我们可以知道发生了什么? – betabandido