2015-08-17 69 views
0

我目前正在学习如何使用lib ZeroMQ,朋友建议我使用个人项目。zmq ::代理示例不起作用()

在阅读文档并计划如何使用我的项目的lib之后,我开始使用文档给出的代码测试项目。 我使用的测试是this one。 不幸的是它不起作用。我做了一些小修改来测试它。 (我给你准确的代码,我有我的测试,这是很多我很抱歉,但没有一切,我认为这是没有意义的,它是不可能的帮助我:/)。

我几乎没有改变文档给出的测试,只是增加了一些输出来测试,我也删除了客户端的投票(我认为问题来自这里,因为它阻止了无限循环,甚至认为有超时)。

#include <vector> 
    #include <thread> 
    #include <memory> 
    #include <functional> 


    #include <zmq.h> 
    #include <zmq.hpp> 
    #include <zhelper.hpp> 

    // This is our client task class. 
    // It connects to the server, and then sends a request once per second 
    // It collects responses as they arrive, and it prints them out. We will 
    // run several client tasks in parallel, each with a different random ID. 
    // Attention! -- this random work well only on linux. 

    class client_task { 
    public: 
     client_task() 
      : ctx_(1), 
       client_socket_(ctx_, ZMQ_DEALER) 
     {} 

     void start() { 
      // generate random identity 
      char identity[10] = {}; 
      sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000)); 
      printf("-> %s\n", identity); 
      client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity)); 
      client_socket_.connect("tcp://localhost:5570"); 

      zmq_pollitem_t items; 
      items.socket = &client_socket_; 
      items.fd = 0; 
      items.events = ZMQ_POLLIN; 
      items.revents = 0; 

      int request_nbr = 0; 
      try { 
       while (true) { 

        for (int i = 0; i < 100; ++i) { 

         // 10 milliseconds 
         sleep(1); 
         std::cout << "ici" << std::endl; 
         if (items.revents & ZMQ_POLLIN) { 
          printf("\n%s ", identity); 
          s_dump(client_socket_); 
         } 

         char request_string[16] = {}; 
         sprintf(request_string, "request #%d", ++request_nbr); 
         client_socket_.send(request_string, strlen(request_string)); 

        } 
       } 

      } 
      catch (std::exception &e) 
      {} 
     } 

    private: 
     zmq::context_t ctx_; 
     zmq::socket_t client_socket_; 
    }; 

    // Each worker task works on one request at a time and sends a random number 
    // of replies back, with random delays between replies: 

    class server_worker { 
    public: 
     server_worker(zmq::context_t &ctx, int sock_type) 
      : ctx_(ctx), 
       worker_(ctx_, sock_type) 
     {} 

     void work() { 
       worker_.connect("inproc://backend"); 

      try { 
       while (true) { 
        zmq::message_t identity; 
        zmq::message_t msg; 
        zmq::message_t copied_id; 
        zmq::message_t copied_msg; 
        worker_.recv(&identity); 
        worker_.recv(&msg); 
        std::cout << "I never arrive here" << std::endl; 

        int replies = within(5); 
        for (int reply = 0; reply < replies; ++reply) { 
         std::cout << "LA" << std::endl; 
         s_sleep(within(1000) + 1); 
         copied_id.copy(&identity); 
         copied_msg.copy(&msg); 
         worker_.send(copied_id, ZMQ_SNDMORE); 
         worker_.send(copied_msg); 
        } 
       } 
      } 
      catch (std::exception &e) {} 
     } 

    private: 
     zmq::context_t &ctx_; 
     zmq::socket_t worker_; 
    }; 

    // This is our server task. 
    // It uses the multithreaded server model to deal requests out to a pool 
    // of workers and route replies back to clients. One worker can handle 
    // one request at a time but one client can talk to multiple workers at 
    // once. 

    class server_task { 
    public: 
     server_task() 
      : ctx_(1), 
       frontend_(ctx_, ZMQ_ROUTER), 
       backend_(ctx_, ZMQ_DEALER) 
     {} 

     void run() { 
      frontend_.bind("tcp://*:5570"); 
      backend_.bind("inproc://backend"); 

      server_worker * worker = new server_worker(ctx_, ZMQ_DEALER); 
      std::thread worker_thread(std::bind(&server_worker::work, worker)); 
      worker_thread.detach(); 

      try { 
       zmq::proxy(&frontend_, &backend_, NULL); 
      } 
      catch (std::exception &e) {} 

     } 

    private: 
     zmq::context_t ctx_; 
     zmq::socket_t frontend_; 
     zmq::socket_t backend_; 
    }; 

    // The main thread simply starts several clients and a server, and then 
    // waits for the server to finish. 

    int main (void) 
    { 
     client_task ct1; 
     client_task ct2; 
     client_task ct3; 
     server_task st; 

     std::thread t1(std::bind(&client_task::start, &ct1)); 
     std::thread t2(std::bind(&client_task::start, &ct2)); 
     std::thread t3(std::bind(&client_task::start, &ct3)); 
     std::thread t4(std::bind(&server_task::run, &st)); 

     t1.detach(); 
     t2.detach(); 
     t3.detach(); 
     t4.detach(); 
     std::cout << "ok" << std::endl; 
     getchar(); 
     std::cout << "ok" << std::endl; 
     return 0; 
    } 

输出我从这个代码得到的是以下几点:

-> CC66-C879 
-> 3292-E961 
-> C4AA-55D1 
ok 
ici 
ici 
ici 
... (infinite ici) 

我真的不明白为什么这是行不通的。 客户端的轮询在非套接字上发送异常套接字操作。 我面临的主要问题是这是来自官方文档的测试,我无法使其工作。关于我使用套接字的问题是什么?

感谢您的帮助

回答

1

我发现了这个问题。

还有就是官方文档中的问题(一些明显的错误,如zmq_pollitem_t数组的初始化),另一种是由我的测试不能正常工作。

对于ZMQ ::投票或ZMQ ::代理,你需要投在空插座结构*您必须不插座上使用的指针。 ZMQ poll not working

这些修改后它的工作。我做了另一篇文章解释为什么here

这里是没有我的其它附加测试输出校正代码:

 // Asynchronous client-to-server (DEALER to ROUTER) 
    // 
    // While this example runs in a single process, that is to make 
    // it easier to start and stop the example. Each task has its own 
    // context and conceptually acts as a separate process. 

    #include <vector> 
    #include <thread> 
    #include <memory> 
    #include <functional> 


    #include <zmq.h> 
    #include <zmq.hpp> 
    #include <zhelper.hpp> 

    // This is our client task class. 
    // It connects to the server, and then sends a request once per second 
    // It collects responses as they arrive, and it prints them out. We will 
    // run several client tasks in parallel, each with a different random ID. 
    // Attention! -- this random work well only on linux. 

    class client_task { 
    public: 
     client_task() 
      : ctx_(1), 
       client_socket_(ctx_, ZMQ_DEALER) 
     {} 

     void start() { 
      // generate random identity 
      char identity[10] = {}; 
      sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000)); 
      printf("-> %s\n", identity); 
      client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity)); 
      client_socket_.connect("tcp://localhost:5555"); 

      zmq_pollitem_t items[1];     
      items[0].socket = static_cast<void *> (client_socket_); 
      items[0].fd = 0; 
      items[0].events = ZMQ_POLLIN; 
      items[0].revents = 0; 
      int request_nbr = 0; 
      try { 
       while (true) { 
        for (int i = 0 ; i < 100; ++i) { 

        zmq::poll(items, 1, 10); 
        if (items[0].revents & ZMQ_POLLIN) { 
          printf("\n%s =>", identity); 
          s_dump(client_socket_); 
         } 
        } 

        char request_string[16] = {}; 
        sprintf(request_string, "request #%d", ++request_nbr); 
        client_socket_.send(request_string, strlen(request_string)); 

       } 

      } 
      catch (std::exception &e) 
      { 
       std::cout << "exception : " << zmq_errno() << " "<< e.what() << std::endl; 
       if (zmq_errno() == EINTR) 
        std::cout << "lol"<< std::endl; 
      } 
     } 

    private: 
     zmq::context_t ctx_; 
     zmq::socket_t client_socket_; 
    }; 

    // Each worker task works on one request at a time and sends a random number 
    // of replies back, with random delays between replies: 

    class server_worker { 
    public: 
     server_worker(zmq::context_t &ctx, int sock_type) 
      : ctx_(ctx), 
       worker_(ctx_, sock_type) 
     {} 

     void work() { 
       worker_.connect("inproc://backend"); 

      try { 
       while (true) { 
        zmq::message_t identity; 
        zmq::message_t msg; 
        zmq::message_t copied_id; 
        zmq::message_t copied_msg; 
        worker_.recv(&identity); 
        worker_.recv(&msg); 

        int replies = within(5); 
        for (int reply = 0; reply < replies; ++reply) { 
         s_sleep(within(1000) + 1); 
         copied_id.copy(&identity); 
         copied_msg.copy(&msg); 
         worker_.send(copied_id, ZMQ_SNDMORE); 
         worker_.send(copied_msg); 
        } 
       } 
      } 
      catch (std::exception &e) 
      { 
       std::cout << "Error in worker : " << e.what() << std::endl; 
      } 
     } 

    private: 
     zmq::context_t &ctx_; 
     zmq::socket_t worker_; 
    }; 

    // This is our server task. 
    // It uses the multithreaded server model to deal requests out to a pool 
    // of workers and route replies back to clients. One worker can handle 
    // one request at a time but one client can talk to multiple workers at 
    // once. 

    class server_task { 
    public: 
     server_task() 
      : ctx_(1), 
       frontend_(ctx_, ZMQ_ROUTER), 
       backend_(ctx_, ZMQ_DEALER) 
     {} 

     void run() { 
      frontend_.bind("tcp://*:5555"); 
      backend_.bind("inproc://backend"); 

      server_worker * worker = new server_worker(ctx_, ZMQ_DEALER); 
      std::thread worker_thread(std::bind(&server_worker::work, worker)); 
      worker_thread.detach(); 

      try { 
       zmq::proxy(static_cast<void *>(frontend_), static_cast<void *> (backend_), NULL); 
      } 
      catch (std::exception &e) 
      { 
       std::cout << "Error in Server : " << e.what() << std::endl; 
      } 

     } 

    private: 
     zmq::context_t ctx_; 
     zmq::socket_t frontend_; 
     zmq::socket_t backend_; 
    }; 

    // The main thread simply starts several clients and a server, and then 
    // waits for the server to finish. 

    int main (void) 
    { 
     client_task ct1; 
     client_task ct2; 
     client_task ct3; 
     server_task st; 

     std::thread t4(std::bind(&server_task::run, &st)); 
     t4.detach(); 
     std::thread t1(std::bind(&client_task::start, &ct1)); 
     std::thread t2(std::bind(&client_task::start, &ct2)); 
     std::thread t3(std::bind(&client_task::start, &ct3)); 

     t1.detach(); 
     t2.detach(); 
     t3.detach(); 

     getchar(); 
     return 0; 
    }