2012-01-24 47 views
3

我有一个应用程序,它实现了像Python中的责任链。有一个通过multiprocessing.Queue()将对象传递给其他进程然后对对象执行操作的进程。对传递的对象的最后修改时间进行跟踪也很重要,因此只有在修改对象时才能执行操作。Python多处理.Queue修改对象

我遇到的问题是,对象中的_modified属性在从队列中提取后会随机更改。但是_mtime属性总是正确的。下面的示例将运行并(有意)随机修改DummyObject,然后将其放置在每个处理程序进程的队列中。然后每个处理程序将打印它们在对象中收到的_modified和_mtime值。我期望_modified值在command_func和处理函数中都是相同的,但通常情况并非如此。如果我从DummyObject中删除Object_w_mtime继承,那么在发送和接收的对象中看不到任何差异。

我对python比较陌生。就我所知,应该发生的事情是每次将一个对象放在队列中,将其腌制,然后通过管道发送到接收过程,从而取消对该对象的反应。那是对的吗?有没有什么方法可以在对象被腌渍/取消时对象继承会被搞砸?

我在Ubuntu 11.10上用Python 2.7.2和2.6.7以及Ubuntu 11.04上的python 2.7.1测试了它。有时你必须让它运行一分钟左右才能看到行为,因为它看起来是随机的。

在这里抓住吸管,在此先感谢。

import multiprocessing 
import time 
import traceback 
import os 
import random 

class Object_w_mtime(object): 
    ''' 
    Parent object that tracks the last time an attribute was modified 
    ''' 
    def __setattr__(self,a_name,a_value): 
     if ((a_name not in ('_mtime','_modified')) and 
      (a_value != getattr(self,a_name,None)) 
     ): 
      object.__setattr__(self, '_modified', True) 
      object.__setattr__(self, '_mtime', time.time()) 
     object.__setattr__(self, a_name, a_value) 
     return True 
    #END def 

    def reset(self): 
     self._modified = False 
#END class 

class DummyObject(Object_w_mtime): 
    def __init__(self): 
     self.value = 10 

def handler(in_queue = None, handler_id = None): 
    print 'PID:' + str(os.getpid()) + ':handler{0}:<RUN>'.format(handler_id) 
    while True: 
     try: 
      obj = in_queue.get(True,61) 
      print 'handler{} - _modified'.format(handler_id), obj._modified, ' \t_mtime', obj._mtime 
     except multiprocessing.queues.Empty: 
      break 
     except KeyboardInterrupt: 
      break 
     except Exception as e: 
      print traceback.format_exc() 
      break 
    return True 
#END def 

def command_func(next_links = None): 
    print 'PID:' + str(os.getpid()) + ':command_func:<RUN>' 
    obj = DummyObject() 
    while True: 
     try: 
      # randomly assign a different value to test with a modified and unmodified object 
      obj.value = random.randint(0,1) 
      print '**************** obj.value = {0} ***************'.format(obj.value) 
      print 'command_ - _modified', obj._modified, ' \t_mtime', obj._mtime 
      for each in next_links: 
       each.put(obj,False) 
     except multiprocessing.queues.Empty: 
      break 
     except KeyboardInterrupt: 
      break 
     except Exception as e: 
      print e 
      print traceback.format_exc() 
      break 
     obj.reset() 
     time.sleep(3) 
    return True 
#END def 


if __name__ == '__main__': 
    handler_queues = list() 
    handler_processes = list() 
    # Create a queue and process object for each command handler 
    for handler_id in range(1,4): 
     queue = multiprocessing.Queue() 
     process = multiprocessing.Process(target=handler, args=(queue, handler_id)) 
     handler_queues.append(queue) 
     handler_processes.append(process) 

    try: 
     # spawn handler processes 
     for process in handler_processes: 
      process.start() 
     # Start sending commands to handlers 
     command_func(handler_queues) 

    # exit on keyboard interrupt 
    except KeyboardInterrupt: 
     for process in handler_processes: 
      process.join() 
    except Exception: 
     traceback.print_exc() 

回答

5

简而言之,您在将其放入队列后修改obj

查看http://svn.python.org/view/python/trunk/Lib/multiprocessing/queues.py?revision=76434&view=markup第285行,put()仅将对象放置在内部队列中,如果尚未运行,则启动后台线程以处理来自该队列的对象。因此,代码中的each.put(obj,False)obj.reset()之间存在竞争。

您应该只能使用具有不可变(副本)对象的队列。

+0

谢谢,这完美地解释了这个问题。 –