2014-10-16 22 views
0

我想了解如何有效地使用ZMQ多线程(所以发送不阻止接收和阻止发送)。是否有可能以某种方式在ZMQ中同时使用Send/Recv(通过多线程)?

我想使用ZMQ_DONTWAIT标志,但是在发送数据时,它有时不会被发送(EAGAIN错误,所以我将不得不重新排队消息,这在处理兆字节数据时浪费资源)。

我没有想出下面的代码:

Concurrency::concurrent_queue<zmq::message_t> QUEUE_IN; 
Concurrency::concurrent_queue<zmq::message_t> QUEUE_OUT; 

void SendThread(zmq::context_t &context) { 
    zmq::socket_t zmq_socket(context, ZMQ_DEALER); 
    zmq_socket.connect(string_format("tcp://%s:%s", address, port).c_str()); 
    zmq::message_t reply; 
    while (true) { 
     while (QUEUE_OUT.try_pop(reply)) 
      zmq_socket.send(reply); 
     Sleep(1); 
    } 
} 

void RecvThread(zmq::context_t &context) { 
    zmq::socket_t zmq_socket(context, ZMQ_DEALER); 
    zmq_socket.connect(string_format("tcp://%s:%s", address, port).c_str()); 
    zmq::message_t reply; 
    while (true) { 
     while (zmq_socket.recv(&reply)) 
      QUEUE_IN.push(reply); 
    } 
} 

void ConnectionThread() 
{ 
    zmq::context_t context(1); 
    std::thread* threads[2] = { 
     new std::thread(SendThread, context), 
     new std::thread(RecvThread, context) 
    }; 
    threads[0]->join(); 
} 

然而这将需要在服务器端有两个插座,我会需要确定,而我需要发送的数据和我需要在服务器端收听,对吧? 有没有办法使用一个套接字,但在多线程环境中使用发送和接收?

我可能会喜欢在一个套接字上使用asychroniously,但是在研究异步样本之后,我仍然没有把握这个想法,因为没有太多的评论。

回答

0
  1. 当发送大量数据(你说你在单条消息中发送MB数据)时,需要一些时间,ZMQ不会“双工”发送和接收,以便它们都可以实际发生。 DONTWAIT标志不会对您有太大帮助,其目的是确保您在执行非ZMQ操作时不会等待ZMQ。所有消息在任何情况下都应该排队等待(不受高水位标记的干扰)
  2. 只有安全使用多线程并行发送和接收的方式是使用多个套接字。

但是,这并不全是坏事。如果您使用一个指定的发送套接字和一个指定的接收套接字,那么您可以使用pub/sub打开一些有趣的选项。

+0

您不需要多个上下文,只需要多个线程。 – 2014-10-17 16:15:04

+0

@JohnJefferies你是对的,我误解了警告[在这里指南](http://zguide.zeromq.org/page:all#Getting-the-Context-Right),单独的过程需要一个单独的上下文,单独线程不。我相应地更新了我的答案。 – Jason 2014-10-17 17:58:46

0

避免睡眠

为了避免睡眠,你可以使用ZMQ_POLLOUT事件,以保护发送使用zmq_poll()()。您不需要使用ZMQ_DONTWAIT。 [我用了C函数在那里,你的绑定将有相当于]

路由到RecvThread

一个不能线程之间共享插槽,所以需要2个插槽用于这项工作。服务器只需要一个绑定到2个端口的套接字(可能是ROUTER)。当它收到消息时,它将需要知道在哪里发送答复...

当ROUTER套接字接收到消息时,zmq内部使用发送者的身份向消息添加一个帧。这个帧将被服务器代码看到,当构建一条消息来回复发送者时,它通常会使用相同的标识帧。在你的情况下,这是客户端的SendThread。 OTOH,你想回复客户端的接收套接字,所以身份框架必须是这样的。

唯一剩下的就是服务器如何获取客户端的接收套接字的身份框架。为此,你需要发明一个小协议。安排客户端的RecvThread发送一条消息到服务器几乎就足够了。服务器应该理解该消息并简单地保留客户端的接收套接字的身份框架,并在构建回复消息时使用它的副本。

所有这些都在“探索ROUTER套接字”下的指南中进行了说明。

相关问题