2011-06-05 13 views
5

这是我的脚本。zero mq酒吧/分与多部分不工作


#!/usr/bin/env python 

import traceback 
import sys 
import zmq 
from time import sleep 

print "Creating the zmq.Context" 
context = zmq.Context() 

print "Binding the publisher to the local socket at port 5557" 
sender = context.socket(zmq.PUB) 
sender.bind("tcp://*:5557") 

print "Binding the subscriber to the local socket at port 5557" 
receiver = context.socket(zmq.SUB) 
receiver.connect("tcp://*:5557") 

print "Setting the subscriber option to get only those originating from \"B\"" 
receiver.setsockopt(zmq.SUBSCRIBE, "B") 

print "Waiting a second for the socket to be created." 
sleep(1) 

print "Sending messages" 
for i in range(1,10): 
    msg = "msg %d" % (i) 
    env = None 
    if i % 2 == 0: 
     env = ["B", msg] 
    else: 
     env = ["A", msg] 
    print "Sending Message: ", env 
    sender.send_multipart(env) 

print "Closing the sender." 
sender.close() 

failed_attempts = 0 
while failed_attempts < 3: 
    try: 
     print str(receiver.recv_multipart(zmq.NOBLOCK)) 
    except: 
     print traceback.format_exception(*sys.exc_info()) 
     failed_attempts += 1 

print "Closing the receiver." 
receiver.close() 

print "Terminating the context." 
context.term() 

""" 
Output: 

Creating the zmq.Context 
Binding the publisher to the local socket at port 5557 
Binding the subscriber to the local socket at port 5557 
Setting the subscriber option to get only those originating from "B" 
Waiting a second for the socket to be created. 
Sending messages 
Sending Message: ['A', 'msg 1'] 
Sending Message: ['B', 'msg 2'] 
Sending Message: ['A', 'msg 3'] 
Sending Message: ['B', 'msg 4'] 
Sending Message: ['A', 'msg 5'] 
Sending Message: ['B', 'msg 6'] 
Sending Message: ['A', 'msg 7'] 
Sending Message: ['B', 'msg 8'] 
Sending Message: ['A', 'msg 9'] 
Closing the sender. 
['B', 'msg 2'] 
['B', 'msg 4'] 
['B', 'msg 6'] 
['B', 'msg 8'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
['Traceback (most recent call last):\n', ' File "./test.py", line 43, in \n print str(receiver.recv_multipart(zmq.NOBLOCK))\n', ' File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', ' File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', ' File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', ' File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n'] 
Closing the receiver. 
Terminating the context. 
""" 

而问题是......为什么这段代码不工作?

[编辑]在zeromq邮件列表上获得超级快速响应后,我更新了上面的代码。

回答

8

信用:查雷梅斯

您可能需要插座创建步骤(绑定,连接setsockopt的)和消息的实际传输之间的“休眠”。绑定&连接操作是异步的,因此当您到达发送所有消息的逻辑时,它们可能无法完成。在这种情况下,通过PUB套接字发送的任何消息都将被丢弃,因为zmq_bind()操作在另一个套接字成功连接到该套接字之前不会创建队列。

作为一个方面说明,在本例中不需要创建2个上下文。两个套接字都可以在相同的上下文中创建。它没有伤害,但它也没有必要。

来源:彼得

有一个 “问题解决者” 在Ch1结束,说明这一点。

某些套接字类型(ROUTER和PUB)将自动丢弃没有收件人的邮件 。正如Chuck所说,连接是异步的,需要大约100毫秒。如果你启动两个线程,绑定 的一端,连接另一端,然后立即启动以通过这种套接字类型发送数据 ,则将丢失前100毫秒的数据 (大约)。

做一个睡眠是一个残酷的“证明它有效”的选项。实际上 你会以某种方式进行同步,或者(更一般地)希望消息丢失 作为正常启动的一部分(即,将发布的数据看作纯粹的 广播,没有明确的开始或结束)。

查看天气更新示例,syncpub和syncsub了解详情。