2014-11-13 153 views
6

当您使用简单的ZeroMQ REQ/REP模式时,您依赖于固定的send() - > recv()/ recv() - > send()序列。 由于this文章描述了当参与者在请求中断时出现问题,因为那时你不能重新开始接收来自另一个连接的下一个请求,但是状态机会强制你发送一个请求给断开连接一。zeromq:重置REQ/REP套接字状态

自从上面提到的文章已经写出以来,是否出现了一种更优雅的方法来解决这个问题?

正在重新连接(从不使用REQ/REP除了而是用另一种模式)解决了这个

+1

你可能想要切换到在[导引]所述的路由器/经销商基于可靠的请求响应中的一个(AKA海盗)图案(http://zguide.zeromq.org/page :所有#Chapter-Reliable-Request-Reply-Patterns) –

回答

5

好消息是,随着ZMQ 3.0及更高版本(当今时代),您可以在套接字上设置超时。正如其他人所指出的其他地方,你必须这样做,你已经创建了插座后,但在连接之前:

zmq_req_socket.setsockopt(zmq.RCVTIMEO, 500) # milliseconds

然后,当你真正尝试接收应答(您发送一条消息后REP套接字)时,您可以捕获如果超时超时将导致的错误:

try: 
    send(message, 0) 
    send_failed = False 

except zmq.Again: 
    logging.warning("Image send failed.") 
    send_failed = True 

但是!当这种情况发生时,正如其他地方所观察到的那样,你的套接字将处于一个有趣的状态,因为它仍然期待响应。在这一点上,除了重新启动套接字之外,我找不到任何可靠工作的东西。请注意,如果断开()套接字然后重新连接(),它仍然处于这种不良状态。因此,你需要

def reset_my_socket: 
    zmq_req_socket.close() 
    zmq_req_socket = zmq_context.socket(zmq.REQ) 
    zmq_req_socket.setsockopt(zmq.RCVTIMEO, 500) # milliseconds 
    zmq_req_socket.connect(zmq_endpoint) 

您还会注意到,因为我关闭()D插座,接收超时选项被“丢失”的,所以重要的是一套在新的插座。

我希望这会有所帮助。我希望这不能成为这个问题的最佳答案。 :)

+0

这仍然使REP套接字处于不良状态,因为它试图发送从未收到的回复。 – orodbhen

1

有一个解决这个问题的唯一方式,它是将超时到所有通话。由于ZeroMQ本身并不真正提供简单的超时功能,所以我建议使用ZeroMQ套接字的一个子类,它为所有重要的调用添加一个超时参数。

因此,不是调用s.recv(),而是调用s.recv(timeout = 5.0),如果响应没有在5秒钟内返回,它将返回None并停止阻塞。当我遇到这个问题时,我做了一个徒劳无益的尝试。

+3

我目前没有使用python测试它的zeromq 4,但我相信你推荐的这种方法将解决不确定等待的问题,但不能解决状态机问题。然后你可以再次调用'recv()'而不是'send()'。然后你会得到一个异常,如'zmq.error.ZMQError:操作无法在当前状态下完成。告诉我,如果我错了(我会尝试它,但我必须安装最新版本的'pyzmq') – frans

+0

我的经验与您描述的frans相同。如果REQ套接字已经发送了send()调用但从未收到响应,那么它就会等待来自某个地方的响应。所以我可以凭经验确认你的直觉。 – user3162307

+0

正如我上面评论的那样,即使您重置REQ套接字,REP套接字仍然会处于不良状态。它停留在回复模式,而不是接收模式。 – orodbhen

1

我现在实际上正在研究这个问题,因为我很适合传统系统。

我不断遇到需要了解连接状态的“需要”代码。但是,我想转移到图书馆推动的消息传递范式。

我发现了以下功能:zmq_socket_monitor

它的作用是监视传递给它的插槽,并生成然后传递给一个“进程内”终点事件 - 在这一点上,你可以添加处理代码实际上做一些东西。

还有一个例子(实际测试代码)在这里:github

我没有得到任何具体的代码给的那一刻(也许在一周结束),但我的本意是向响应连接和断开连接,以便我可以实际执行所需逻辑的任何重置。

希望这有助于,尽管引用4.2文档,我使用的4.0.4似乎也有功能 以及。

注意我注意到你说说上面蟒蛇,但问题是标签C++因此,这就是我的答案是来自......

11

由于接受的答案对我来说似乎非常难过,所以我做了一些研究并发现我们需要的所有东西实际上都在文档中。

.setsockopt()用正确的参数可以帮助你重新设置您的套接字状态机没有惨遭摧毁它,重建另一个比上一个人死亡的身体上。

(是的,我喜欢图片)。

ZMQ_REQ_CORRELATE: match replies with requests
The default behavior of REQ sockets is to rely on the ordering of messages to match requests and responses and that is usually sufficient. When this option is set to 1 , the REQ socket will prefix outgoing messages with an extra frame containing a request id. That means the full message is (request id , identity , 0 , user frames…). The REQ socket will discard all incoming messages that don't begin with these two frames.
Option value type int
Option value unit 0 , 1
Default value 0
Applicable socket types ZMQ_REQ

ZMQ_REQ_RELAXED: relax strict alternation between request and reply
By default, a REQ socket does not allow initiating a new request with zmq_send(3) until the reply to the previous one has been received. When set to 1, sending another message is allowed and has the effect of disconnecting the underlying connection to the peer from which the reply was expected, triggering a reconnection attempt on transports that support it. The request-reply state machine is reset and a new request is sent to the next available peer.
If set to 1 , also enable ZMQ_REQ_CORRELATE to ensure correct matching of requests and replies. Otherwise a late reply to an aborted request can be reported as the reply to the superseding request.
Option value type int
Option value unit 0 , 1
Default value 0
Applicable socket types ZMQ_REQ

A complete documentation is here