2016-07-14 46 views
1

考虑以下两个过程:ZeroMQ REQ/REP性能

sender.cpp:

#include <zhelpers.h> 
... 
zmq::context_t ctx(1); 
... 
void foo(int i) 
{ 
    zmq::socket_t sender(ctx, ZMQ_REQ); 
    sender.connect("tcp://hostname:5000"); 

    std::stringstream ss; 
    ss <<"bar_" <<i; 
    std::string bar_i(std::move(ss.str()); 

    s_sendmore(sender, "foo "); 
    (i != N) ? s_send(sender, bar, 0) : s_send(sender, "done", 0); 
    s_recv(sender); 
} 

int main() 
{ 
    for(int i=0; i<=100000; ++i) 
     foo(i); 
    return 0; 
} 

receiver.cpp

#include <zhelpers.h> 
... 
int main() 
{ 
    zmq::context_t ctx(1); 
    zmq::socket_t rcv(ctx, ZMQ_REP); 
    rcv.bind("tcp://*:5000"); 

    std::string s1(""); 
    std::string s2(""); 

    while(s2 != "done") 
    { 
     s1 = std::move(s_recv(rcv)); 
     s2 = std::move(s_recv(rcv)); 
     std::cout <<"received: " <<s1 <<" " <<s2 <<"\n"; 
     s_send(rcv, "ACK"); 
    } 

    return 0; 
} 

让我们先从两个过程。我会想到的是,接收器进程将接收方发送给它的所有信息,它会打印出:

foo bar_1 
foo bar_2 
... 

等等,直到:

... 
foo bar_100000 

我预计它会做到这一点没有任何阻碍。

我的问题是,接收器总是围绕28215th迭代(总是围绕该数字!!!)并且阻塞直到一分钟左右。然后进一步到100000,但有时它会再次粘住。我的问题当然是:为什么会发生这种情况?我该如何解决它?

我试图把'发送者'放在全局范围内的foo(。)中,然后它就起作用了:在这种情况下,所有打印输出从1到100000顺利且超快速地进行,没有任何阻塞当然在这种情况下,每次调用foo(。)时都不会创建套接字)。但不幸的是,在我的代码中,我无法做到这一点。

我想了解为什么会出现此块。

+0

最大套接字可能在服务器端受到限制。尝试增加它可能会解决它。因为tcp需要花费时间来清理死亡套接字,并且你有很多这种情况会触发最大数量的套接字。 – somdoron

回答

0

首先,你的例子不是非常可行的,因为它们不编译。因此,这里有一些exapmles,应该是接近你的意图和实际编译

sender.cpp

#include <zmq.hpp> 
#include <string> 
#include <iostream> 
#include <string> 

void send(const std::string& msg) 
{ 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REQ); 

    std::cout << "Connecting to receiver ..." << std::endl; 
    socket.connect ("tcp://localhost:5555"); 

    zmq::message_t request (100); 
    memcpy (request.data(), msg.c_str(), 100); 
    std::cout << "Sending message " << msg << "..." << std::endl; 
    socket.send (request); 
} 

int main() 
{ 
    for(int i = 0; i < 100000; ++i) 
    { 
     send(std::to_string(i)); 
    } 
    send("done"); 
} 

使用一些临客

g++ -std=c++11 -I/home/dev/cppzmq -I/home/dev/libzmq/include sender.cpp -lzmq -o sender 

receiver.cpp

#include <zmq.hpp> 
#include <string> 
#include <cstring> 
#include <iostream> 

int main() { 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REP); 
    socket.bind ("tcp://*:5555"); 

    char buf[100] = {0}; 
    while (std::string(buf).compare("done")) { 
     zmq::message_t request; 

     // Wait for next request from client 
     socket.recv (&request); 
     std::memcpy(buf, request.data(), 100); 
     std::cout << "Received message " << buf << std::endl; 

     // Send reply back to client 
     zmq::message_t reply (5); 
     memcpy (reply.data(), "Hello", 5); 
     socket.send (reply); 
    } 
    return 0; 
} 

使用

g++ -std=c++11 -I/home/dev/cppzmq -I/home/dev/libzmq/include receiver.cpp -lzmq -o receiver 

启动过程的时候,一切似乎都做工精细,不出所料,没有休息的接收器输出:

Received message 99996 
Received message 99997 
Received message 99998 
Received message 99999 
Received message done 

但我所期待的:看看netstat的:

netstat 
Active Internet connections (w/o servers) 
Proto Recv-Q Send-Q Local Address   Foreign Address   State  
tcp  0  0 localhost:38345   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46228   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60309   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46916   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:47600   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:54454   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46409   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51142   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40355   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40005   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45614   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48974   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41427   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58740   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58754   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60044   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57478   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:50419   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:44361   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:37284   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:38662   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45968   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57407   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:59200   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41292   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:55243   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51489   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48865   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:35491   localhost:5555   TIME_WAIT 
... 

一次运行后,我有超过20k(!)个这样的插槽,处于TIME_WAIT状态。这是因为中socket的变化范围void send(...)。我不清楚zmq在超出范围时销毁套接字时会做什么,但我确定它将在套接字的fd上调用close(),这将使您的套接字处于TIME_WAIT状态。即使我的发送者和接收者运行顺利,我也不知道你的系统如何处理这么多的套接字。另外,我不知道你的zhelpers.h文件在做什么。但是我知道,如果将套接字置于全局范围内,则在一个套接字上的发送方只会发生一次close()调用。我会从这里开始调查更多。也许,看看how-to-forcibly-close-a-socket-in-time-wait ...

+0

谢谢。会检查。对不起,我的代码没有编译。我只是想向你展示问题本身(这就是为什么我使用'...'的原因)。我不想去看每一个细节。 zhelpers.h可以在这里找到例如:https://github.com/imatix/zguide2/tree/master/examples/C%2B%2B – gybacsi

+1

另一件事是,也许你只是不想声明你的在全局范围内发送套接字变量,但如果您只是将范围扩大一点,就可以用数千个TIME_WAIT声明的套接字解​​决问题,即将它声明在发送循环之外并重复使用它至少用于发送100000个消息。 – yussuf