2014-02-14 108 views
1

我正在尝试使用ZMQ构建多个发布者/多订户拓扑。我创建了一个使用espresso.py示例的示例,对其进行了一些细微的修改。我想确保我所做的是正确的,因为我对zeromq相当陌生。请随时批评和评论。使用XPUB/XSUB的ZeroMQ多重发布者和订阅者 - 这是一个正确的实现吗?

我已经基本上吸取了一些教训。

  • 一个ZMQ套接字可以绑定到只有在多个流程的一个端口到一个单一的网络卡(又名普通插座)

  • 绑定并不意味着听即你可以绑定后发出connect() (对于套接字开发人员来说非常混淆,但是这不是套接字)

  • 代理和XPUB/XSUB是用来作为模式,当订阅者不需要找出并连接到所有的发布者。

我真的不喜欢关于下面的代码是每个用户绑定到一个单独的插座。虽然这是一个必要的罪恶,但不知何故,我一直认为这看起来不正确。

所以这里是我的示例代码。

# Espresso Pattern 
# This shows how to capture data using a pub-sub proxy 
# 

import time 

from random import randint 
from string import uppercase 
from threading import Thread 

import zmq 
from zmq.devices import monitored_queue 

from zhelpers import zpipe 

# The subscriber thread requests messages starting with 
# A and B, then reads and counts incoming messages. 


def subscriber_thread(): 
    ctx = zmq.Context.instance() 

    # Subscribe to "A" and "B" 
    subscriber = ctx.socket(zmq.SUB) 
    subscriber.connect("tcp://localhost:6001") 
    subscriber.setsockopt(zmq.SUBSCRIBE, b"A") 
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B") 

    count = 0 
    while True: 
     try: 
      msg = subscriber.recv_multipart() 
     except zmq.ZMQError as e: 
      if e.errno == zmq.ETERM: 
       break   # Interrupted 
      else: 
       raise 
     count += 1 

    print ("Subscriber received %d messages" % count) 


# .split publisher thread 
# The publisher sends random messages starting with A-J: 

def publisher_thread(port, char): 
    ctx = zmq.Context.instance() 

    publisher = ctx.socket(zmq.PUB) 
    publisher.bind("tcp://*:"+str(port)) 

    while True: 
     string = "%s-%05d" % (char, randint(port, port+500)) 
     try: 
      publisher.send(string) 
     except zmq.ZMQError as e: 
      if e.errno == zmq.ETERM: 
       break   # Interrupted 
      else: 
       raise 
     time.sleep(0.1)   # Wait for 1/10th second 

# .split listener thread 
# The listener receives all messages flowing through the proxy, on its 
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects 
# attached child threads via inproc. In other languages your mileage may vary: 

def listener_thread(pipe): 

    # Print everything that arrives on pipe 
    while True: 
     try: 
      print (pipe.recv_multipart()) 
     except zmq.ZMQError as e: 
      if e.errno == zmq.ETERM: 
       break   # Interrupted 


# .split main thread 
# The main task starts the subscriber and publisher, and then sets 
# itself up as a listening proxy. The listener runs as a child thread: 

def main(): 

    # Start child threads 
    ctx = zmq.Context.instance() 
    p_thread1 = Thread(target=publisher_thread, args=(6000,'A')) 
    p_thread2 = Thread(target=publisher_thread, args=(7000,'B')) 
    s_thread = Thread(target=subscriber_thread) 
    p_thread1.start() 
    p_thread2.start() 
    s_thread.start() 

    pipe = zpipe(ctx) 

    subscriber = ctx.socket(zmq.XSUB) 
    subscriber.connect("tcp://localhost:6000") 
    subscriber.connect("tcp://localhost:7000") 

    publisher = ctx.socket(zmq.XPUB) 
    publisher.bind("tcp://*:6001") 

    l_thread = Thread(target=listener_thread, args=(pipe[1],)) 
    l_thread.start() 

    try: 
     monitored_queue(subscriber, publisher, pipe[0], 'pub', 'sub') 
    except KeyboardInterrupt: 
     print ("Interrupted") 

    del subscriber, publisher, pipe 
    ctx.term() 

if __name__ == '__main__': 
    main() 
+1

好现在我很困惑。上面的代码实际上是不正确的。根据XPUB/XSUB文档,它应该是XPUB/XSUB端的bind(),而订户和发布者都应该使用connect()(代码连接PDF上的第1卷,第48页)。我再也不能在这里上传代码,但你明白了。 – vivekv

回答

2

我在ZeroMQ github页面上提出了一个问题,并得到了回复。这是ZeroMQ中的一个已知错误,这是由于订阅消息的接收方完全准备好之前发布和订阅发生在提出订阅请求的不同线程中。更多详细信息可以在这里找到。

https://github.com/zeromq/libzmq/issues/897

我试图模仿这里的问题

https://gist.github.com/vivekfantain/9021979

分享这一切都为别人谁在同样的问题绊倒。

+0

更重要的一点 - XPUB/XSUB在ZMQ 2.xx版本中被严重破坏。我设法让它只与4.x版本一起工作。 – vivekv

相关问题