我注意到一个zeromq PUB套接字缓冲所有传出的数据,如果它是连接,例如ZeroMQ PUB套接字缓冲区我所有的传出数据时,它连接
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print sub.recv()
子这些消息后结合,他们应该被删除,因为如果没有人连接到它,PUB应该放弃消息。但不是丢弃消息,而是缓存所有消息。
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi
正如你所看到的,那些“消息不应该被丢弃”是由插座缓冲,一旦它被连接时,它们刷新到SUB插口。如果我在PUB套接字处绑定,并在SUB套接字处连接,则它可以正常工作。
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print repr(sub.recv())
而且你只能看到输出
'hi'
这种奇怪的行为引起一个问题,它缓存的连接插座上的所有数据,我有两台服务器,服务器A将数据发布到服务器乙
Server A -- publish --> Server B
如果服务器B上线后,它工作正常。但是如果我启动服务器A并且不启动服务器B呢?
其结果是,在服务器A上的连接插座PUB保持所有这些数据,内存使用率变高。
这里的问题是,这种行为是不是一个错误或功能?如果是功能,我在哪里可以找到提及此行为的文档?我怎样才能停止连接PUB套接字缓冲所有数据?
谢谢。
我知道我可以设置HWM来限制缓冲区中的消息号。但是它并没有解决问题,PUB处理HWM状态的方式就是删除新消息。这意味着如果您设置HWM,则只有前导消息保存在缓冲区中。我正在写的是音频流媒体系统。这种行为使得使用起来非常烦人。比方说,你发送消息[1,2,3,4],然后HWM被设置为2,那么套接字会为你缓冲[1,2],所有新消息都会被丢弃。但对于音频流,最重要的部分是新的数据。有什么方法可以调整HWM如何丢弃信息? –
啊,所以你的意思是你想要的行为是,如果HWM设置为2并且你发送[1,2,3,4],那么它应该[1,2]并且保持[3,4],但是然后如果你发送了5,它应该减少3,并最终以[4,5]结束?我不认为ZMQ中存在这种行为。 – aculich
这非常有趣。当然,有些应用程序需要删除“旧”消息的能力(IP电话是一个常见的例子)。 –