在Python没有真正并发除了通过由于GIL多处理模块然后不是图片的一部分。
你想要做什么,需要一个事件循环在您检查事件队列和调度适当。 Pypubsub可能会让你的生活变得更轻松,但是对于你想要的东西来说可能是过度的(作为pubsub的作者,我感觉很舒服:) :)由于mp模块提供了多个进程的无缝集成,是否真的有一个不使用它的理由如果并发性真的是你需要的?
想要的事件从任何线程进入一个或多个线程的事实表明,你可以使用共享队列后,任何线程可以张贴到,该数据表明该事件类型和事件数据。此外,您还可以为每个线程发送一条消息队列:线程发布到共享帖子队列,主进程事件循环会检查帖子队列并根据需要将事件复制到各个线程消息队列。每个线程必须定期检查其队列并进行处理,以消除处理后的事件。每个线程都可以订阅特定事件的主进程。
这里是将消息发送到对方3个辅助线程的一个例子:
from multiprocessing import Process, Queue, Lock
from Queue import Empty as QueueEmpty
from random import randint
def log(lock, threadId, msg):
lock.acquire()
print 'Thread', threadId, ':', msg
lock.release()
def auxThread(id, lock, sendQueue, recvQueue, genType):
## Read from the queue
log(lock, id, 'starting')
while True:
# send a message (once in a while!)
if randint(1,10) > 7:
event = dict(type = genType, fromId = id, val = randint(1, 10))
log(lock, id, 'putting message type "%(type)s" = %(val)s' % event)
sendQueue.put(event)
# block until we get a message:
maxWait = 1 # second
try:
msg = recvQueue.get(False, maxWait)
log(lock, id, 'got message type "%(type)s" = %(val)s from thread %(fromId)s' % msg)
if (msg['val'] == 'DONE'):
break
except QueueEmpty:
pass
log(lock, id, 'done')
def createThread(id, lock, postOffice, genType):
messagesForAux = Queue()
args = (id, lock, postOffice, messagesForAux, genType)
auxProc = Process(target=auxThread, args=args)
auxProc.daemon = True
return dict(q=messagesForAux, p=auxProc, id=id)
def mainThread():
postOffice = Queue() # where all threads post their messages
lock = Lock() # so print can be synchronized
# setup threads:
msgThreads = [
createThread(1, lock, postOffice, 'heartbeat'),
createThread(2, lock, postOffice, 'new_socket'),
createThread(3, lock, postOffice, 'keypress'),
]
# identify which threads listen for which messages
dispatch = dict(
heartbeat = (2,),
keypress = (1,),
new_socket = (3,),
)
# start all threads
for th in msgThreads:
th['p'].start()
# process messages
count = 0
while True:
try:
maxWait = 1 # second
msg = postOffice.get(False, maxWait)
for threadId in dispatch[msg['type']]:
thObj = msgThreads[threadId - 1]
thObj['q'].put(msg)
count += 1
if count > 20:
break
except QueueEmpty:
pass
log(lock, 0, "Main thread sending exit signal to aux threads")
for th in msgThreads:
th['q'].put(dict(type='command', val='DONE', fromId=0))
for th in msgThreads:
th['p'].join()
log(lock, th['id'], 'joined main')
log(lock, 0, "DONE")
if __name__ == '__main__':
mainThread()
你是完全正确的,这说明股pypubsub功能相似,但你会只使用pypubsub的一小部分,我认为你努力中的大部分复杂性是两种类型的队列,pypubsub不会帮助解决问题。一旦你有使用mp模块工作的队列系统(按照我的例子),你可以引入pypubsub并发布/排队它的消息,而不是你自己植入的事件。
“暂停整个过程(所有线程都在其中)” - 您是指GIL还是导致“暂停”的其他内容? – shx2