2017-04-25 36 views
1

我在使用回调函数和处理程序的Python中出现了一个奇怪的现象。 我使用ZMQ来处理我的通信并为套接字使用流。我有基类:回调函数在实例中看不到正确的值

import multiprocessing  
import zmq 
from concurrent.futures import ThreadPoolExecutor 
from zmq.eventloop import ioloop, zmqstream 
from zmq.utils import jsonapi as json 

# Types of messages 
TYPE_A = 'type_a' 
TYPE_B = 'type_b' 


class ZmqProcess(multiprocessing.Process): 
    def __init__(self): 
     super(ZmqProcess, self).__init__() 
     self.context = None 
     self.loop = None 
     self.handle_stream = None 

    def setup(self): 
     self.context = zmq.Context() 
     self.loop = ioloop.IOLoop.instance() 

    def send(self, msg_type, msg, host, port): 
     sock = zmq.Context().socket(zmq.PAIR) 
     sock.connect('tcp://%s:%s' % (host, port)) 
     sock.send_json([msg_type, msg]) 

    def stream(self, sock_type, addr): 
     sock = self.context.socket(sock_type) 
      if isinstance(addr, str): 
      addr = addr.split(':') 
     host, port = addr if len(addr) == 2 else (addr[0], None) 
      if port: 
      sock.bind('tcp://%s:%s' % (host, port)) 
     else: 
      port = sock.bind_to_random_port('tcp://%s' % host) 
     stream = zmqstream.ZMQStream(sock, self.loop)  
     return stream, int(port) 

class MessageHandler(object): 
    def __init__(self, json_load=-1): 
     self._json_load = json_load 
     self.pool = ThreadPoolExecutor(max_workers=10) 

    def __call__(self, msg): 
     i = self._json_load 
     msg_type, data = json.loads(msg[i]) 
     msg[i] = data 
     if msg_type.startswith('_'): 
      raise AttributeError('%s starts with an "_"' % msg_type) 
     getattr(self, msg_type)(*msg) 

而且我有一个类,从它继承:

import zmq  
import zmq_base  

class ZmqServerMeta(zmq_base.ZmqProcess): 
    def __init__(self, bind_addr, handlers): 
     super(ZmqServerMeta, self).__init__() 
     self.bind_addr = bind_addr 
     self.handlers = handlers 

    def setup(self): 
     super(ZmqServerMeta, self).setup() 
     self.handle_stream, _ = self.stream(zmq.PAIR, self.bind_addr) 
     self.handle_stream.on_recv(StreamHandler(self.handle_stream, self.stop, 
               self.handlers)) 

    def run(self): 
     self.setup() 
     self.loop.start() 

    def stop(self): 
     self.loop.stop() 

class StreamHandler(zmq_base.MessageHandler): 
    def __init__(self, handle_stream, stop, handlers): 
     super(StreamHandler, self).__init__() 
     self._handle_stream = handle_stream 
     self._stop = stop 
     self._handlers = handlers 

    def type_a(self, data): 
     if zmq_base.TYPE_A in self._handlers: 
      if self._handlers[zmq_base.TYPE_A]: 
       for handle in self._handlers[zmq_base.TYPE_A]: 
        self.pool.submit(handle, data) 
      else: 
       pass 
     else: 
      pass 

    def type_b(self, data): 
     if zmq_base.TYPE_B in self._handlers: 
      if self._handlers[zmq_base.TYPE_B]: 
       for handle in self._handlers[zmq_base.TYPE_B]: 
        self.pool.submit(handle, data) 
      else: 
       pass 
     else: 
      pass 

    def endit(self): 
     self._stop() 

此外,我有我想作为存储使用的类。这里是麻烦就来了:

import threading 
import zmq_server_meta as server 
import zmq_base as base 


class Storage: 
    def __init__(self): 
     self.list = [] 

     self.list_lock = threading.RLock() 

     self.zmq_server = None 
     self.host = '127.0.0.1' 
     self.port = 5432 
     self.bind_addr = (self.host, self.port) 

    def setup(self): 
     handlers = {base.TYPE_A: [self. remove]} 
     self.zmq_server = server.ZmqServerMeta(handlers=handlers, bind_addr=self.bind_addr) 
     self.zmq_server.start() 

    def add(self, data): 
     with self.list_lock: 
      try: 
       self.list.append(data) 
      except: 
       print "Didn't work" 

    def remove(self, msg): 
     with self.list_lock: 
      try: 
       self.list.remove(msg) 
      except: 
       print "Didn't work" 

的想法是,那类存储一些全局它接受到的信息。 它是在一个文件中的所有开始测试:

import sys 
import time 
import storage 
import zmq_base as base 
import zmq_server_meta as server 



def printMsg(msg): 
    print msg 

store = storage.Storage() 

store.setup() 
handlers = {base.TYPE_B: [printMsg]} 
client = server.ZmqServerMeta(handlers=handlers, bind_addr=('127.0.0.1', 5431)) 
client.start() 

message = "Test" 

store.add(message) 
client.send(base.TYPE_A, message, '127.0.0.1', 5432) 

我简化它以减少混乱。它不是仅仅添加它,而是通常发送然后回复。响应,客户端发送,应该通过正确的回调remove()来处理,并且它应该从列表中删除某些内容。发生的问题是,remove()函数会看到一个空列表,尽管列表中应该有一个元素。如果从测试文件中检查,我可以在添加元素后看到该元素,如果从那里调用remove(),则会看到一个非空列表并可以将其删除。我的问题是,为什么回调看到一个空的列表,我如何确保它看到列表中的正确元素?

亲切的问候 帕特里克

回答

1

我相信,该ZmqProcess类从multiprocessing.Process继承的事实问题奠定。多重不允许不同的进程间共享的对象,除了通过使用值或阵列的共享存储器映射(如可以在文档中可以看出:https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes

如果你想使用自定义的对象,你可以使用服务器进程/代理对象,可以在文档的同一页面找到。

所以,你可以,例如,在存储类等的初始化函数定义经理:self.manager = Manager()然后你把self.list = self.manager.list()。这应该做的伎俩。

相关问题