2012-01-21 30 views
13

我注意到一个zeromq PUB套接字缓冲所有传出的数据,如果它是连接,例如ZeroMQ PUB套接字缓冲区我所有的传出数据时,它连接

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.connect("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.bind("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print sub.recv() 

子这些消息后结合,他们应该被删除,因为如果没有人连接到它,PUB应该放弃消息。但不是丢弃消息,而是缓存所有消息。

a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
hi 

正如你所看到的,那些“消息不应该被丢弃”是由插座缓冲,一旦它被连接时,它们刷新到SUB插口。如果我在PUB套接字处绑定,并在SUB套接字处连接,则它可以正常工作。

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.bind("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.connect("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print repr(sub.recv()) 

而且你只能看到输出

'hi' 

这种奇怪的行为引起一个问题,它缓存的连接插座上的所有数据,我有两台服务器,服务器A将数据发布到服务器乙

Server A -- publish --> Server B 

如果服务器B上线后,它工作正常。但是如果我启动服务器A并且不启动服务器B呢?

其结果是,在服务器A上的连接插座PUB保持所有这些数据,内存使用率变高。

这里的问题是,这种行为是不是一个错误或功能?如果是功能,我在哪里可以找到提及此行为的文档?我怎样才能停止连接PUB套接字缓冲所有数据?

谢谢。

回答

6

是否插座块或滴剂消息取决于如在ZMQ::Socket documentation描述的插座型(下面重点是矿):

ZMQ :: HWM:检索高水位标记

的ZMQ: :HWM的选择应检索的 指定套接字的高水位。高水位标记是未完成消息的最大数量的硬限制0MQ将在指定套接字与之通信的单个对等体的任何 内存中排队。

如果已达到极限插座应进入一个特殊的 状态,并根据套接字类型,0MQ应采取适当行动 如阻塞或删除发送的消息。有关每种套接字类型所采取的确切的 动作的详细信息,请参阅ZMQ :: Socket中的单个套接字说明的 。

零的默认ZMQ :: HWM值表示“无限制”。

你可以看它是否会阻止或通过的说明文件,插槽类型为ZMQ::HWM option action这要么是BlockDrop看下降。

ZMQ::PUB的动作是Drop,所以如果它是不会放弃你应该检查HWM(高水位)值,并听取这一缺省值为零ZMQ :: HWM值意味着“无极限”的警告,这意味着在系统内存不足之前它不会进入异常状态(在这一点上我不知道它是如何表现的)。

+0

我知道我可以设置HWM来限制缓冲区中的消息号。但是它并没有解决问题,PUB处理HWM状态的方式就是删除新消息。这意味着如果您设置HWM,则只有前导消息保存在缓冲区中。我正在写的是音频流媒体系统。这种行为使得使用起来非常烦人。比方说,你发送消息[1,2,3,4],然后HWM被设置为2,那么套接字会为你缓冲[1,2],所有新消息都会被丢弃。但对于音频流,最重要的部分是新的数据。有什么方法可以调整HWM如何丢弃信息? –

+0

啊,所以你的意思是你想要的行为是,如果HWM设置为2并且你发送[1,2,3,4],那么它应该[1,2]并且保持[3,4],但是然后如果你发送了5,它应该减少3,并最终以[4,5]结束?我不认为ZMQ中存在这种行为。 – aculich

+0

这非常有趣。当然,有些应用程序需要删除“旧”消息的能力(IP电话是一个常见的例子)。 –

0

所以结合()和连接()导致两个不同的行为。你为什么不只是选择你喜欢哪一个(好像绑定())和使用?

它确实是ZeroMQ的一个特性,一般来说,它将缓冲出去的消息直到建立连接。

+0

因为我有多个节点想要将数据发布到一个众所周知的服务器。当然,我可以绑定在PUB端,但结果是,我需要每个节点有N个地址,服务器不知道会有多少个节点。我认为绑定和连接不应该影响行为,一旦建立连接,绑定和连接没有区别,那么为什么会有所不同?我不明白:S –

+0

哦,好的。那么我认为ZeroMQ的行为和预期的一样,因此您可能只需在发送数据之前查询连接。 –

+0

@JohnZwinck选择'bind()'与'connect()'不是基于偏好,而应该基于它的使用方式。他通过服务器(发布者)上的bind()和客户端(订户)上的connect()来正确使用它。它并不总是缓冲传出的消息,而是由套接字类型和高水位标记的值来确定[参见文档](http://stackoverflow.com/a/8958699/462302 )。 – aculich

0

您应该可以使用hwm settingom酒吧套接字在套接字中设置高水位标记。它可以让你定义保存多少条消息。

1

他们在套接字上设置HWM选项。

4

我觉得这种行为是zmq_connect()的语义。 也就是:当zmq_connect()返回成功,则连接在概念上建立,因此连接PUB开始排队消息而不是丢弃

继从“ZMQ Guide”摘录了这样的提示:

与ØMQ插座理论,无所谓哪端连接,并 哪一端结合。但是,对于PUB-SUB插座,如果绑定SUB 插座并连接PUB插座,则SUB插座可能会收到旧的 消息,即在SUB启动之前发送的消息。 这是一个 绑定/连接工作方式的工件。如果可以,最好绑定PUB和 连接SUB。

zmq_connect(部分)具有一些提示,如下图所示:

到常规插座

主要差异一般来说,常规的插座呈现同步 接口要么面向连接可靠字节流 (SOCK_STREAM)或无连接不可靠数据报(SOCK_DGRAM)。 相比之下,ØMQ套接字提供了一个异步的 消息队列的抽象,具体确切的排队语义取决于使用中的 套接字类型。在传统的套接字传输 字节或离散数据报的流时,ØMQ套接字传输离散消息。

ØMQ插座是异步意味着物理 连接建立的定时和拆除,重新连接和有效的递送是 对用户透明的,并通过ØMQ自身组织的。此外,消息 可以在对等体不可用以接收它们的情况下排队。

0

这里是一个黑客工具,可以帮助...

设置你的ZMQ::HWM到一个固定的数,如10。连接后,请在循环中调用用户套接字的方法recv,直到它丢弃所有缓存的消息,然后才启动主接收循环。