2010-08-05 114 views
0

我在写一个当前从UDP接收数据的UDP服务器将其包装在一个对象中并将它们放入一个并发队列中。并发队列是这里提供的实现:http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html全局静态变量的副作用

工作线程池将数据拉出队列进行处理。

队列被定义为全球:

static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_; 

现在我遇到的问题是,如果我只是写一个函数来产生数据,并将其插入到队列中,并创建一些消费者线程拉他们它工作正常。 但是,当我添加基于UDP的生产者时,工作线程停止收到队列中数据到达的通知。

我已经将问题跟踪到了concurrent_queue中推送函数的末尾。 具体而言:the_condition_variable.notify_one(); 使用我的网络代码时不返回。

所以这个问题与我写网络代码的方式有关。

下面是它的样子。

enum 
{ 
    MAX_LENGTH = 1500 
}; 


class Msg 
{ 
    public: 
    Msg() 
    { 
     static int i = 0; 
     i_ = i++; 
     printf("Construct ObbsMsg: %d\n", i_); 
    } 

    ~Msg() 
    { 
     printf("Destruct ObbsMsg: %d\n", i_); 
    } 

    const char* toString() { return data_; } 

    private: 
    friend class server; 

    udp::endpoint sender_endpoint_; 
    char data_[MAX_LENGTH]; 
    int i_; 
}; 

class server 
{ 
public: 
    server::server(boost::asio::io_service& io_service) 
    : io_service_(io_service), 
     socket_(io_service, udp::endpoint(udp::v4(), PORT)) 
    { 
    waitForNextMessage(); 
    } 

    void server::waitForNextMessage() 
    { 
    printf("Waiting for next msg\n"); 

    next_msg_.reset(new Msg()); 

    socket_.async_receive_from(
     boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_, 
     boost::bind(&server::handleReceiveFrom, this, 
        boost::asio::placeholders::error, 
        boost::asio::placeholders::bytes_transferred)); 
    } 

    void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd) 
    { 
    if (!error && bytes_recvd > 0) { 
     printf("got data: %s. Adding to work queue\n", next_msg_->toString()); 
     g_work_queue.push(next_msg_); // Add received msg to work queue 
     waitForNextMessage(); 
    } else { 
     waitForNextMessage(); 
    } 
    } 

private: 
    boost::asio::io_service& io_service_; 
    udp::socket socket_; 

    udp::endpoint sender_endpoint_; 
    boost::shared_ptr<Msg> next_msg_; 
} 

int main(int argc, char* argv[]) 
{ 
    try{ 
     boost::asio::io_service io_service; 
     server s(io_service); 
     io_service.run(); 
    catch(std::exception& e){ 
     std::err << "Exception: " << e.what() << std::endl; 
    } 
    return 0; 
} 

现在我发现如果handle_receive_from能够返回,那么concurrent_queue返回的notify_one()。所以我认为这是因为我有一个递归循环。 那么开始监听新数据的正确方法是什么?并且是异步udp服务器的例子有缺陷,因为我根据他们已经在做的事情来制定它。

编辑:好的问题刚刚变得更加怪异。

我在这里没有提到的是我有一个叫做处理器的类。 处理器看起来是这样的:

class processor 
{ 
public: 
    processor::processor(int thread_pool_size) : 
     thread_pool_size_(thread_pool_size) { } 

    void start() 
    { 
    boost::thread_group threads; 
    for (std::size_t i = 0; i < thread_pool_size_; ++i){ 
     threads.create_thread(boost::bind(&ObbsServer::worker, this)); 
    } 
    } 

    void worker() 
    { 
    while (true){ 
     boost::shared_ptr<ObbsMsg> msg; 
     g_work_queue.wait_and_pop(msg); 
     printf("Got msg: %s\n", msg->toString()); 
    } 
    } 

private: 
    int thread_pool_size_; 
}; 

现在看来,如果我提取职工功能出在它自己的,并从主启动线程 。有用!有人可以解释为什么一个线程的功能与我在课堂以外期望的一样,但是在它里面有副作用吗?

EDIT2:现在它仍然

我掏出两个函数(完全一样)变得更加古怪。

一个被称为消费者,另一个工人。

void worker() 
{ 
    while (true){ 
     boost::shared_ptr<ObbsMsg> msg; 
     printf("waiting for msg\n"); 
     g_work_queue.wait_and_pop(msg); 
     printf("Got msg: %s\n", msg->toString()); 
    } 
} 

void consumer() 
{ 
    while (true){ 
     boost::shared_ptr<ObbsMsg> msg; 
     printf("waiting for msg\n"); 
     g_work_queue.wait_and_pop(msg); 
     printf("Got msg: %s\n", msg->toString()); 
    } 
} 

现在,消费者住在server.cpp文件的顶部。即我们的服务器代码也在这里生存。

另一方面,工作人员住在processor.cpp文件中。

现在我暂时没有使用处理器。主要功能现在看起来是这样的:

void consumer(); 
void worker(); 

int main(int argc, char* argv[]) 
{ 
    try { 
     boost::asio::io_service io_service; 
     server net(io_service); 
     //processor s(7); 

     boost::thread_group threads; 
     for (std::size_t i = 0; i < 7; ++i){ 
      threads.create_thread(worker); // this doesn't work 
      // threads.create_thread(consumer); // THIS WORKS!?!?!? 
     } 

//  s.start(); 

     printf("Server Started...\n"); 
     boost::asio::io_service::work work(io_service); 
     io_service.run(); 

     printf("exiting...\n"); 
    } catch (std::exception& e) { 
     std::cerr << "Exception: " << e.what() << "\n"; 
    } 

    return 0; 
} 

为什么消费者能够接收排队的项目,但工人是没有的。 它们是具有不同名称的相同实现。

这没有任何意义。有任何想法吗?

下面是示例输出接收的TXT的 “Hello World” 的时候:

输出1:不工作。在调用工作者函数或使用处理器类时。

Construct ObbsMsg: 0 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
Server Started... 
waiting for msg 
got data: hello world. Adding to work queue 
Construct ObbsMsg: 1 

输出2:在调用与辅助函数相同的使用者函数时工作。

Construct ObbsMsg: 0 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
waiting for msg 
Server Started... 
waiting for msg 
got data: hello world. Adding to work queue 
Construct ObbsMsg: 1 
Got msg: hello world <----- this is what I've been wanting to see! 
Destruct ObbsMsg: 0 
waiting for msg 
+0

一个更好的名字将有助于其他人在未来找到这个。 – 2010-08-05 08:25:12

+0

谢谢,我希望现在这个名字更有意义。在这里学到了一个重要的教训。 – Matt 2010-08-05 23:12:55

回答

1

回答我自己的问题。

看来问题在于g_work_queue的声明;

在头文件中声明为:static concurrent_queue < boost :: shared_ptr> g_work_queue;

看来,声明它是静态的不是我想要做的。 显然,致力于为每个编译的.o文件,显然 单独的锁单独的队列对象等

这就解释了为什么当队列正在同一个源文件 在同一个文件中的消费者和生产者的内部操纵它工作。 但是,当在不同的文件中它并不是因为线程实际上在不同的对象上等待。

所以我重新声明了这样的工作队列。

-- workqueue.h -- 
extern concurrent_queue< boost::shared_ptr<Msg> > g_work_queue; 

-- workqueue.cpp -- 
#include "workqueue.h" 
concurrent_queue< boost::shared_ptr<Msg> > g_work_queue; 

这样做可以解决问题。

+0

建议您接受您的解决方案,然后:) – 2010-08-05 08:23:12