我在写一个当前从UDP接收数据的UDP服务器将其包装在一个对象中并将它们放入一个并发队列中。并发队列是这里提供的实现:http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html全局静态变量的副作用
工作线程池将数据拉出队列进行处理。
队列被定义为全球:
static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_;
现在我遇到的问题是,如果我只是写一个函数来产生数据,并将其插入到队列中,并创建一些消费者线程拉他们它工作正常。 但是,当我添加基于UDP的生产者时,工作线程停止收到队列中数据到达的通知。
我已经将问题跟踪到了concurrent_queue中推送函数的末尾。 具体而言:the_condition_variable.notify_one(); 使用我的网络代码时不返回。
所以这个问题与我写网络代码的方式有关。
下面是它的样子。
enum
{
MAX_LENGTH = 1500
};
class Msg
{
public:
Msg()
{
static int i = 0;
i_ = i++;
printf("Construct ObbsMsg: %d\n", i_);
}
~Msg()
{
printf("Destruct ObbsMsg: %d\n", i_);
}
const char* toString() { return data_; }
private:
friend class server;
udp::endpoint sender_endpoint_;
char data_[MAX_LENGTH];
int i_;
};
class server
{
public:
server::server(boost::asio::io_service& io_service)
: io_service_(io_service),
socket_(io_service, udp::endpoint(udp::v4(), PORT))
{
waitForNextMessage();
}
void server::waitForNextMessage()
{
printf("Waiting for next msg\n");
next_msg_.reset(new Msg());
socket_.async_receive_from(
boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_,
boost::bind(&server::handleReceiveFrom, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd)
{
if (!error && bytes_recvd > 0) {
printf("got data: %s. Adding to work queue\n", next_msg_->toString());
g_work_queue.push(next_msg_); // Add received msg to work queue
waitForNextMessage();
} else {
waitForNextMessage();
}
}
private:
boost::asio::io_service& io_service_;
udp::socket socket_;
udp::endpoint sender_endpoint_;
boost::shared_ptr<Msg> next_msg_;
}
int main(int argc, char* argv[])
{
try{
boost::asio::io_service io_service;
server s(io_service);
io_service.run();
catch(std::exception& e){
std::err << "Exception: " << e.what() << std::endl;
}
return 0;
}
现在我发现如果handle_receive_from能够返回,那么concurrent_queue返回的notify_one()。所以我认为这是因为我有一个递归循环。 那么开始监听新数据的正确方法是什么?并且是异步udp服务器的例子有缺陷,因为我根据他们已经在做的事情来制定它。
编辑:好的问题刚刚变得更加怪异。
我在这里没有提到的是我有一个叫做处理器的类。 处理器看起来是这样的:
class processor
{
public:
processor::processor(int thread_pool_size) :
thread_pool_size_(thread_pool_size) { }
void start()
{
boost::thread_group threads;
for (std::size_t i = 0; i < thread_pool_size_; ++i){
threads.create_thread(boost::bind(&ObbsServer::worker, this));
}
}
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
private:
int thread_pool_size_;
};
现在看来,如果我提取职工功能出在它自己的,并从主启动线程 。有用!有人可以解释为什么一个线程的功能与我在课堂以外期望的一样,但是在它里面有副作用吗?
EDIT2:现在它仍然
我掏出两个函数(完全一样)变得更加古怪。
一个被称为消费者,另一个工人。
即
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
void consumer()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
现在,消费者住在server.cpp文件的顶部。即我们的服务器代码也在这里生存。
另一方面,工作人员住在processor.cpp文件中。
现在我暂时没有使用处理器。主要功能现在看起来是这样的:
void consumer();
void worker();
int main(int argc, char* argv[])
{
try {
boost::asio::io_service io_service;
server net(io_service);
//processor s(7);
boost::thread_group threads;
for (std::size_t i = 0; i < 7; ++i){
threads.create_thread(worker); // this doesn't work
// threads.create_thread(consumer); // THIS WORKS!?!?!?
}
// s.start();
printf("Server Started...\n");
boost::asio::io_service::work work(io_service);
io_service.run();
printf("exiting...\n");
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
为什么消费者能够接收排队的项目,但工人是没有的。 它们是具有不同名称的相同实现。
这没有任何意义。有任何想法吗?
下面是示例输出接收的TXT的 “Hello World” 的时候:
输出1:不工作。在调用工作者函数或使用处理器类时。
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
输出2:在调用与辅助函数相同的使用者函数时工作。
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
Got msg: hello world <----- this is what I've been wanting to see!
Destruct ObbsMsg: 0
waiting for msg
一个更好的名字将有助于其他人在未来找到这个。 – 2010-08-05 08:25:12
谢谢,我希望现在这个名字更有意义。在这里学到了一个重要的教训。 – Matt 2010-08-05 23:12:55