2014-02-13 35 views
3

我开发Python中的多线程应用事件。特别是,在这个应用程序中,一个线程应该能够生成一个应该被通知给一个(或多个)线程的事件;接收事件通知的线程应该中断它们的执行并运行特定的功能。在这个服务功能结束时,他们应该回去做他们在事件产生之前所做的事情。的Python:线程管理由其他线程通知

为了做这样的事情,我正在考虑使用某种发布/订阅模块。我发现一个很容易使用的:PyPubSub。你可以找到here关于如何使用它的一个非常简单的例子。

顺便说一句,当我开始用这个,我意识到它做了什么,我一直在寻找,但只有当你与过程只是工作。如果你有更多的线程,它会暂停整个过程(所有线程)来运行特定的例程。这实际上并不是我正在寻找的行为。不幸的是,我无法将我的应用程序从多线程更改为多进程。

你知道的任何模块,可以帮助我做什么,我想在多线程应用程序呢?谢谢。

+0

“暂停整个过程(所有线程都在其中)” - 您是指GIL还是导致“暂停”的其他内容? – shx2

回答

3

在Python没有真正并发除了通过由于GIL多处理模块然后不是图片的一部分。

你想要做什么,需要一个事件循环在您检查事件队列和调度适当。 Pypubsub可能会让你的生活变得更轻松,但是对于你想要的东西来说可能是过度的(作为pubsub的作者,我感觉很舒服:) :)由于mp模块提供了多个进程的无缝集成,是否真的有一个不使用它的理由如果并发性真的是你需要的?

想要的事件从任何线程进入一个或多个线程的事实表明,你可以使用共享队列后,任何线程可以张贴到,该数据表明该事件类型和事件数据。此外,您还可以为每个线程发送一条消息队列:线程发布到共享帖子队列,主进程事件循环会检查帖子队列并根据需要将事件复制到各个线程消息队列。每个线程必须定期检查其队列并进行处理,以消除处理后的事件。每个线程都可以订阅特定事件的主进程。

这里是将消息发送到对方3个辅助线程的一个例子:

from multiprocessing import Process, Queue, Lock 
from Queue import Empty as QueueEmpty 
from random import randint 


def log(lock, threadId, msg): 
    lock.acquire() 
    print 'Thread', threadId, ':', msg 
    lock.release() 


def auxThread(id, lock, sendQueue, recvQueue, genType): 
    ## Read from the queue 
    log(lock, id, 'starting') 
    while True: 
     # send a message (once in a while!) 
     if randint(1,10) > 7: 
      event = dict(type = genType, fromId = id, val = randint(1, 10)) 
      log(lock, id, 'putting message type "%(type)s" = %(val)s' % event) 
      sendQueue.put(event) 

     # block until we get a message: 
     maxWait = 1 # second 
     try: 
      msg = recvQueue.get(False, maxWait) 
      log(lock, id, 'got message type "%(type)s" = %(val)s from thread %(fromId)s' % msg) 
      if (msg['val'] == 'DONE'): 
       break 
     except QueueEmpty: 
      pass 

    log(lock, id, 'done') 


def createThread(id, lock, postOffice, genType): 
    messagesForAux = Queue() 
    args = (id, lock, postOffice, messagesForAux, genType) 
    auxProc = Process(target=auxThread, args=args) 
    auxProc.daemon = True 
    return dict(q=messagesForAux, p=auxProc, id=id) 


def mainThread(): 
    postOffice = Queue() # where all threads post their messages 
    lock = Lock() # so print can be synchronized 

    # setup threads: 
    msgThreads = [ 
     createThread(1, lock, postOffice, 'heartbeat'), 
     createThread(2, lock, postOffice, 'new_socket'), 
     createThread(3, lock, postOffice, 'keypress'), 
    ] 

    # identify which threads listen for which messages 
    dispatch = dict(
     heartbeat = (2,), 
     keypress = (1,), 
     new_socket = (3,), 
    ) 

    # start all threads 
    for th in msgThreads: 
     th['p'].start() 

    # process messages 
    count = 0 
    while True: 
     try: 
      maxWait = 1 # second 
      msg = postOffice.get(False, maxWait) 
      for threadId in dispatch[msg['type']]: 
       thObj = msgThreads[threadId - 1] 
       thObj['q'].put(msg) 
      count += 1 
      if count > 20: 
       break 

     except QueueEmpty: 
      pass 

    log(lock, 0, "Main thread sending exit signal to aux threads") 
    for th in msgThreads: 
     th['q'].put(dict(type='command', val='DONE', fromId=0)) 

    for th in msgThreads: 
     th['p'].join() 
     log(lock, th['id'], 'joined main') 
    log(lock, 0, "DONE") 


if __name__ == '__main__': 
    mainThread() 

你是完全正确的,这说明股pypubsub功能相似,但你会只使用pypubsub的一小部分,我认为你努力中的大部分复杂性是两种类型的队列,pypubsub不会帮助解决问题。一旦你有使用mp模块工作的队列系统(按照我的例子),你可以引入pypubsub并发布/排队它的消息,而不是你自己植入的事件。

+0

事实上,我的意图是让我的线程做一些事情,并在他们进行活动时打断他们;所以,我并没有真正考虑事件循环。但也许改变我的程序的结构,可以工作。 And ...你将如何创建事件循环?只是一个条件为True的while循环,检查是否有新事件? 无论如何,无论您可以发布什么代码来阐明您在邮件中写的内容都会很棒。 – Cell

+0

我不知道有任何中断非主线程的方法(即使信号模块只向主线程发送信号;并且仅在Unix上工作)。我以工作示例hth扩展。 – Schollii