2014-02-07 34 views
2

我想调试发生在boost :: interprocess消息队列内的零星访问冲突。 (访问违例读取共享内存区域中的地址)。boost :: interprocess消息队列创建时的竞争条件?

环境:boost 1.54,VC++ 2010。在调试&版本构建中发生。

它总是发生在或约在message_queue.hpp线854(在接收的情况下): 评论是由我添加

 recvd_size  = top_msg.len; // top_msg points to invalid location 

或线路756(在发送的情况下)

BOOST_ASSERT(free_msg_hdr.priority == 0); // free_msg_hdr points to invalid location 

看起来好像这与消息队列创建有关。如果消息队列是“正确”创建的(即没有可能的竞争条件),则错误不会发生。 否则它可能发生在队列上的timed_receive()或timed_send()表面上随机的时间。

我想出了一个代表问题的简短例子: 不幸的是,我不能在Coliru上运行它,因为它需要两个进程。 一个必须启动没有任何参数,第二个与任何单个参数。 经过多次运行后,其中一个进程将在message_queue中崩溃。

#include <iostream> 
#include <boost/interprocess/ipc/message_queue.hpp> 
#include <boost/thread.hpp> 
#include <boost/assert.hpp> 
#include <boost/date_time.hpp> 

using namespace boost::interprocess; 
using namespace boost::posix_time; 
using boost::posix_time::microsec_clock; // microsec_clock is ambiguous between boost::posix_time and boost::interprocess. What are the odds? 

int main(int argc, wchar_t** argv) 
{ 
    while(true) 
    { 
     int proc = 0; 
     message_queue* queues[2] = {NULL, NULL}; 
     std::string names[] = {"msgq0", "msgq1"}; 
     if(1 == argc) 
     { 
      proc = 0; 
      message_queue::remove(names[0].c_str()); 
      if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; } 
      queues[0] = new message_queue(open_or_create, names[0].c_str(), 128, 10240); 

      bool bRet = false; 
      do 
      { 
       try 
       { 
        if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; } 
        queues[1]=new message_queue(open_only, names[1].c_str()); 
        bRet = true; 
       } 
       catch(const interprocess_exception&) 
       { 
        //boost::this_thread::sleep(boost::posix_time::milliseconds(2)); 
        delete queues[1]; 
        queues[1] = NULL; 
        continue; 
       } 
      }while(!bRet); 

     } 
     else 
     { 
      proc = 1; 
      message_queue::remove(names[1].c_str()); 
      if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; } 
      queues[1] = new message_queue(open_or_create, names[1].c_str(), 128, 10240); 

      bool bRet = false; 
      do 
      { 
       try 
       { 
        if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; } 
        queues[0]=new message_queue(open_only, names[0].c_str()); 
        bRet = true; 
       } 
       catch(const interprocess_exception&) 
       { 
        //boost::this_thread::sleep(boost::posix_time::milliseconds(2)); 
        delete queues[0]; 
        queues[0] = NULL; 
        continue; 
       } 
      }while(!bRet); 
     } 

     long long nCnt = 0; 
     for(int i = 0; i < 1; ++i) 
     { 
      if(proc) 
      { 
       std::string sOut; 
       sOut = "Proc1 says: Hello ProcA " + std::to_string(nCnt) + " "; 
       sOut.resize(10230, ':'); 
       for(int n = 0; n < 3; ++n) 
       { 
        queues[1]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1)); 
       } 

       bool bMessage = false; 
       for(int n = 0; n < 3; ++n) 
       { 
        size_t nRec; unsigned int nPrio; 
        std::string sIn; sIn.resize(10240); 
        bMessage = queues[0]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1)); 
        if(bMessage) 
        { 
         sIn.resize(nRec); 
         //std::cout << sIn << " "; 
        } 
       } 
       if(bMessage) 
       { 
        //std::cout << std::endl; 
       } 
      } 
      else 
      { 
       std::string sOut; 
       sOut = "Proc0 says: Hello Procccccccdadae4325a " + std::to_string(nCnt); 
       sOut.resize(10240, '.'); 
       for(int n = 0; n < 3; ++n) 
       { 
        queues[0]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1)); 
       } 

       bool bMessage = false; 
       for(int n = 0; n < 3; ++n) 
       { 
        size_t nRec; unsigned int nPrio; 
        std::string sIn; sIn.resize(10240); 
        bMessage = queues[1]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1)); 
        if(bMessage) 
        { 
         sIn.resize(nRec); 
         //std::cout << sIn << " "; 
        } 
       } 
       if(bMessage) 
       { 
        //std::cout << std::endl; 
       } 
      } 

      nCnt++; 
      boost::this_thread::sleep(boost::posix_time::milliseconds(10)); 
     } 
    } 
    return 0; 
} 

我还在想我可能做错了什么,因为我无法找到有关此问题的其他地方什么,Boost库通常是非常好的。

有没有什么我可能会做错在这个例子中的message_queue的用法?

回答

1

我不认为过程使用open_or_create是一个支持的习惯用法。您是否知道this thread on the mailing list?我无法找到更多的讨论,所以在我看来,终身管理最终没有必要被添加。

因此,您需要手动将创建与boost::interprocess同步,或者可能需要将其中一个进程重试为open_only队列,直到其他进程创建该队列。

+0

谢谢!这就是这个例子所做的。正如你所建议的,每个进程创建传出队列(open_or_create)并使用open_only旋转另一个队列。然而,只是由于你的帖子和链接的预感,我尝试了create_only而不是open_or_create。我目前正在测试这个,我会回来一点。 – namezero

+0

好吧,经过一些快速测试,create_only和open_only的catch块中的睡眠(1000)似乎可以将问题缓解到在生产应用程序中不会发生的问题。但是,在测试应用程序中,我仍然可以复制它,尽管不太经常。所以似乎有一场关于创建/打开消息队列的竞赛需要解决。我感到惊讶的是,这显然从来没有出现任何地方,虽然关于所使用的锁的讨论使我感到有点使用它的不安。 – namezero

+0

如果再发生这种情况,我可能会考虑其他IPC队列库,但现在上面提到的“解决方案”似乎能够产生可接受的结果,至少在生产应用程序中。我将把这篇文章链接到boost bugtracker;也许有人在那里有一些投入。 – namezero