2012-03-19 30 views
2

您好我有一些问题包装Python类中的一些ZMQ拉客户端。这些类被实例化并通过多处理模块在子进程中调用。当客户端功能全部有效时,但是当它们是类时,poller.poll()会挂起。ZMQ轮询不在类实例中工作

代码波纹管有两个版本:一个可以工作,另一个不可以。为什么?

import zmq 
import time 
import sys 
import random 
from multiprocessing import Process 

def server_push(port="5556"): 
    context = zmq.Context() 
    socket = context.socket(zmq.PUSH) 
    socket.bind("tcp://*:%s" % port) 
    print "Running server on port: ", port 
    # serves only 5 request and dies 
    for reqnum in range(10): 
     if reqnum < 6: 
      socket.send("Continue") 
     else: 
      socket.send("Exit") 
      break 
     time.sleep (1) 

def server_pub(port="5558"): 
    context = zmq.Context() 
    socket = context.socket(zmq.PUB) 
    socket.bind("tcp://*:%s" % port) 
    publisher_id = random.randrange(0,9999) 
    print "Running server on port: ", port 
    # serves only 5 request and dies 
    for reqnum in range(10): 
     # Wait for next request from client 
     topic = random.randrange(8,10) 
     messagedata = "server#%s" % publisher_id 
     print "%s %s" % (topic, messagedata) 
     socket.send("%d %s" % (topic, messagedata)) 
     time.sleep(1)  


class Client: 
    def __init__(self,port_push, port_sub): 
     context = zmq.Context() 
     self.socket_pull = context.socket(zmq.PULL) 
     self.socket_pull.connect ("tcp://localhost:%s" % port_push) 
     print "Connected to server with port %s" % port_push 
     self.socket_sub = context.socket(zmq.SUB) 
     self.socket_sub.connect ("tcp://localhost:%s" % port_sub) 
     self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9") 
     print "Connected to publisher with port %s" % port_sub 
     # Initialize poll set 


    def __call__(self): 
     poller = zmq.Poller() 
     poller.register(self.socket_pull, zmq.POLLIN) 
     poller.register(self.socket_sub, zmq.POLLIN) 
     # Work on requests from both server and publisher 
     should_continue = True 
     print "listening" 
     while should_continue: 
      print "hello" 
      socks = dict(poller.poll()) 
      print poller 
      if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN: 
       message = self.socket_pull.recv() 
       print "Recieved control command: %s" % message 
       if message == "Exit": 
        print "Recieved exit command, client will stop recieving messages" 
        should_continue = False 

       if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN: 
        string = self.socket_sub.recv() 
        topic, messagedata = string.split() 
        print "Processing ... ", topic, messagedata 

def client(port_push, port_sub): 
    context = zmq.Context() 
    socket_pull = context.socket(zmq.PULL) 
    socket_pull.connect ("tcp://localhost:%s" % port_push) 
    print "Connected to server with port %s" % port_push 
    socket_sub = context.socket(zmq.SUB) 
    socket_sub.connect ("tcp://localhost:%s" % port_sub) 
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9") 
    print "Connected to publisher with port %s" % port_sub 
    # Initialize poll set 
    poller = zmq.Poller() 
    poller.register(socket_pull, zmq.POLLIN) 
    poller.register(socket_sub, zmq.POLLIN) 
    # Work on requests from both server and publisher 
    should_continue = True 
    while should_continue: 
     socks = dict(poller.poll()) 
     if socket_pull in socks and socks[socket_pull] == zmq.POLLIN: 
      message = socket_pull.recv() 
      print "Recieved control command: %s" % message 
      if message == "Exit": 
       print "Recieved exit command, client will stop recieving messages" 
       should_continue = False 

     if socket_sub in socks and socks[socket_sub] == zmq.POLLIN: 
      string = socket_sub.recv() 
      topic, messagedata = string.split() 
      print "Processing ... ", topic, messagedata 

if __name__ == "__main__": 
    # Now we can run a few servers 
    server_push_port = "5556" 
    server_pub_port = "5558" 
    Process(target=server_push, args=(server_push_port,)).start() 
    Process(target=server_pub, args=(server_pub_port,)).start() 
    #~ Process(target=client,args=(server_push_port,server_pub_port)).start() 
    Process(target=Client(server_push_port,server_pub_port)).start() 
+0

在与子模块有限的工作我已经注意到,它可以难以诊断异常情况 - 你确定你已经正确诊断并且你没有在其他地方发现异常吗?我过去做过的一件事是明确地将子进程分解为我可以在自己的控制台窗口中监视的进程。 – 2012-03-19 13:26:05

+0

这只是一个测试代码,没有什么隐藏的。无论是假想中断,发生在pyzmq内。我希望有一个解决方法,以便不必等待pyzmq中的修复。 我同意使用多进程时很难调试,但它提供的便利超过了这个问题。 – fccoelho 2012-03-19 13:41:09

回答

2

EDIT1:这是不太正确的,给我一些时间来得到它的权利...

我想你可能会调用客户端类的错误的方式。我不是这方面的专家,但我认为你的客户端应该从Process进行分类,然后使用.start()函数运行。所以,定义你的客户端类是这样的:

class Client(Process): 
    def __init__(self, port_push, port_sub): 
     (...) # your class init code here...make sure indentation is correct 

然后在您运行服务器端,创建您的客户端类的实例并启动它,像这样:

client_class = Client(port_push, port_sub) 
client_class.start() 

EDIT2:这里是编辑的fccoelho代码版本适用于我。

最大的问题似乎是ZMQ初始化的东西需要在__call__方法中完成,而不是在__init__中完成。我怀疑这是由于如何在多处理中分配内存,因为__init__函数将在父进程中完成,而__call__函数在具有单独内存空间的子进程中完成。显然ZMQ不喜欢这个。我还添加了一些等待时间来防止客户端在服务器准备就绪之前连接到服务器,并防止服务器在客户端订阅之前发送消息。还使用127.0.0.1而不是本地主机(由于某种原因,我的电脑不喜欢localhost)。还删除了客户端轮询调用周围烦人的打印消息,并修复了客户端在pubsub套接字上检查轮询结果的缩进问题。

import zmq 
import time 
import sys 
import random 
from multiprocessing import Process 

def server_push(port="5556"): 
    context = zmq.Context() 
    socket = context.socket(zmq.PUSH) 
    socket.bind("tcp://127.0.0.1:%s" % port) 
    print "Running server on port: ", port 
    time.sleep(1.0) 
    # serves only 5 request and dies 
    for reqnum in range(10): 
     if reqnum < 6: 
      socket.send("Continue") 
     else: 
      socket.send("Exit") 
      print 'Push server sent "Exit" signal' 
      break 
     time.sleep(0.4) 

def server_pub(port="5558"): 
    context = zmq.Context() 
    socket = context.socket(zmq.PUB) 
    socket.bind("tcp://127.0.0.1:%s" % port) 
    socket.setsockopt(zmq.HWM, 1000) 
    publisher_id = random.randrange(0,9999) 
    print "Running server on port: ", port 
    time.sleep(1.0) 
    # serves only 5 request and dies 
    for reqnum in range(10): 
     # Wait for next request from client 
     topic = random.randrange(8,10) 
     messagedata = "server#%s" % publisher_id 
     print "%s %s" % (topic, messagedata) 
     socket.send("%d %s" % (topic, messagedata)) 
     time.sleep(0.4)  


class Client: 
    def __init__(self,port_push, port_sub): 
     self.port_push = port_push 
     self.port_sub = port_sub 
     # Initialize poll set 

    def __call__(self): 
     time.sleep(0.5) 
     print 'hello from class client!' 
     context = zmq.Context() 
     self.socket_pull = context.socket(zmq.PULL) 
     self.socket_pull.connect ("tcp://127.0.0.1:%s" % self.port_push) 
     print "Connected to server with port %s" % self.port_push 
     self.socket_sub = context.socket(zmq.SUB) 
     self.socket_sub.connect ("tcp://127.0.0.1:%s" % self.port_sub) 
     self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9") 
     print "Connected to publisher with port %s" % self.port_sub 

     poller = zmq.Poller() 
     poller.register(self.socket_pull, zmq.POLLIN) 
     poller.register(self.socket_sub, zmq.POLLIN) 
     # Work on requests from both server and publisher 
     should_continue = True 
     print "listening" 
     while should_continue: 
      # print "hello" 
      socks = dict(poller.poll()) 
      # print poller 
      if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN: 
       message = self.socket_pull.recv() 
       print "Recieved control command: %s" % message 
       if message == "Exit": 
        print "Recieved exit command, client will stop recieving messages" 
        should_continue = False 

      if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN: 
       string = self.socket_sub.recv() 
       topic, messagedata = string.split() 
       print "Processing ... ", topic, messagedata 

def client(port_push, port_sub): 
    print 'hello from function client!' 
    context = zmq.Context() 
    socket_pull = context.socket(zmq.PULL) 
    socket_pull.connect ("tcp://127.0.0.1:%s" % port_push) 
    print "Connected to server with port %s" % port_push 
    socket_sub = context.socket(zmq.SUB) 
    socket_sub.connect ("tcp://127.0.0.1:%s" % port_sub) 
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9") 
    print "Connected to publisher with port %s" % port_sub 
    # Initialize poll set 
    poller = zmq.Poller() 
    poller.register(socket_pull, zmq.POLLIN) 
    poller.register(socket_sub, zmq.POLLIN) 
    # Work on requests from both server and publisher 
    should_continue = True 
    while should_continue: 
     socks = dict(poller.poll(1000)) 
     if socket_pull in socks and socks[socket_pull] == zmq.POLLIN: 
      message = socket_pull.recv() 
      print "Recieved control command: %s" % message 
      if message == "Exit": 
       print "Recieved exit command, client will stop recieving messages" 
       should_continue = False 

     if socket_sub in socks and socks[socket_sub] == zmq.POLLIN: 
      string = socket_sub.recv() 
      topic, messagedata = string.split() 
      print "Processing ... ", topic, messagedata 

if __name__ == "__main__": 
    # Now we can run a few servers 
    server_push_port = "5556" 
    server_pub_port = "5558" 
    Process(target=server_push, args=(server_push_port,)).start() 
    Process(target=server_pub, args=(server_pub_port,)).start() 
    # Process(target=client,args=(server_push_port,server_pub_port)).start() 
    Process(target=Client(server_push_port,server_pub_port)).start() 

最后,这里有一个更清洁的实现多进程发布订阅的这是非常裸机,但更明确地说明了事情:

import zmq 
from multiprocessing import Process 
import time 

class ServerPubSub(Process): 
    def __init__(self, port, n): 
     Process.__init__(self) 
     self.port = port 
     self.n = n 

    def run(self): 
     self.context = zmq.Context() 
     self.pub = self.context.socket(zmq.PUB) 
     self.pub.bind('tcp://127.0.0.1:%d' % self.port) 
     self.pub.setsockopt(zmq.HWM, 1000) 

     time.sleep(1) 

     end = False 
     for i in range(self.n): 
      print 'SRV: sending message %d' % i 
      self.pub.send('Message %d' % i) 
      print 'SRV: message %d sent' % i 
      time.sleep(0.2) 

     self.pub.close() 

class ClientPubSub(Process): 
    def __init__(self, port, n): 
     Process.__init__(self) 
     self.port = port 
     self.n = n 

    def run(self): 
     self.context = zmq.Context() 
     self.sub = self.context.socket(zmq.SUB) 
     self.sub.connect('tcp://127.0.0.1:%d' % self.port) 
     self.sub.setsockopt(zmq.SUBSCRIBE, '') 
     self.poller = zmq.Poller() 
     self.poller.register(self.sub, zmq.POLLIN) 

     end = False 
     count = 0 
     while count < self.n: 
      ready = dict(self.poller.poll(0)) 
      if self.sub in ready and ready[self.sub] == zmq.POLLIN: 
       msg = self.sub.recv() 
       print 'CLI: received message "%s"' % msg 
       count += 1 

     self.sub.close() 

if __name__ == "__main__": 
    port = 5000 
    n = 10 
    server = ServerPubSub(port, n) 
    client = ClientPubSub(port, n) 

    server.start() 
    client.start() 

    server.join() 
    client.join() 
+0

这并没有改变任何东西,我正在做的就是你的子类。当一个进程启动时,目标可调用被调用。如果你运行上面的代码,你会看到__call__方法被调用,因为打印“聆听”。这是对poller.poll()的调用,挂起... – fccoelho 2012-03-19 13:37:01

+0

当我第一次看到你的代码时,你的缩进被搞砸了,它看起来像你正在定义客户类以外的__call__。 – 2012-03-19 13:41:39

+0

不,缩进问题是当我将代码粘贴到stackoverflow。我已经在这里修复了它,但是我的代码从一开始就正确缩进了 – fccoelho 2012-03-19 14:57:50