2012-08-04 65 views
6

我负责修改同步C程序,以便它可以并行运行。目标是让它尽可能便携,因为它是许多人使用的开源程序。正因为如此,我认为最好将程序包装在C++层中,以便我可以利用便携式boost库。我已经这样做了,一切似乎都按预期工作。多线程C++消息传递

我遇到的问题是决定什么是在线程之间传递消息的最佳方法。幸运的是,该计划的架构是多生产者和单一消费者的架构。更好的是,消息的顺序并不重要。我已经读过单生产者/单消费者(SPSC)队列将受益于这种架构。那些有多线程编程经验的人有什么建议?我对这个东西很陌生。另外任何使用boost来实现SPSC的代码示例都将不胜感激。

+0

查看接受答案http://stackoverflow.com/questions/8918401/does-a-multiple-producer-single-consumer-lock-free-queue-exist-for-c – walrii 2012-08-04 01:47:54

回答

7

以下是我用于合作性多任务/多线程库(MACE)http://bytemaster.github.com/mace/的技术。除了队列为空时外,它具有无锁的好处。

struct task { 
    boost::function<void()> func; 
    task* next; 
}; 


boost::mutex      task_ready_mutex; 
boost::condition_variable  task_ready; 
boost::atomic<task*>    task_in_queue; 

// this can be called from any thread 
void thread::post_task(task* t) { 
    // atomically post the task to the queue. 
    task* stale_head = task_in_queue.load(boost::memory_order_relaxed); 
    do { t->next = stale_head; 
    } while(!task_in_queue.compare_exchange_weak(stale_head, t, boost::memory_order_release)); 

    // Because only one thread can post the 'first task', only that thread will attempt 
    // to aquire the lock and therefore there should be no contention on this lock except 
    // when *this thread is about to block on a wait condition. 
    if(!stale_head) { 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex); 
     task_ready.notify_one(); 
    } 
} 

// this is the consumer thread. 
void process_tasks() { 
    while(!done) { 
    // this will atomically pop everything that has been posted so far. 
    pending = task_in_queue.exchange(0,boost::memory_order_consume); 
    // pending is a linked list in 'reverse post order', so process them 
    // from tail to head if you want to maintain order. 

    if(!pending) { // lock scope 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex);     
     // check one last time while holding the lock before blocking. 
     if(!task_in_queue) task_ready.wait(lock); 
    } 
} 
+0

'合作':(( – 2012-08-04 07:37:31

+0

..虽然+1在每条消息中使用链接来避免队列中的消息存储 – 2012-08-04 07:52:57

+0

非常感谢。 – grouma 2012-08-04 16:55:50

1

如果只有一个消费者但是有多个生产者,那么我会使用一个数组或者一些具有O(1)访问时间的数组结构的数据结构,其中每个数组槽都代表一个生产者消费者队列。单生产者消费者队列的巨大优势在于,您可以在没有任何显式同步机制的情况下使其无锁,从而使其成为多线程环境中非常快速的数据结构。请参阅my answer here了解单生产者消费者队列的基本实现。

+0

我采用这种技术之前采用原子解决方案我遇到的问题是它没有扩展,它在缓冲区'空闲'时消耗内存,如果你不知道什么线程可能会提前发布(通用),那么你必须动态调整大小(通过锁定)或硬编码“最大”最大线程数。 – bytemaster 2012-08-04 02:34:05

+0

我正在考虑使用循环队列......这样就不需要重新调整队列的大小。 – Jason 2012-08-04 04:17:20

+0

这是可以填满的'固定大小队列',但是如果你有N个线程,那么每个线程或者需要一个输入用于所有其他N个线程,或者第一次新线程试图与另一个线程通信时,它必须分配它自己的单个生产者/单个消费者队列。这是调整大小与硬代码问题。 – bytemaster 2012-08-04 04:21:29

1

在网上有很多生产者 - 消费者队列的例子,对多个生产者/消费者是安全的。 @bytemaster发布了一个在每个消息中使用链接来消除队列类本身的存储 - 这是一个很好的方法,我自己在嵌入式作业中使用它。

在队列类必须提供存储的地方,我通常会使用大小为N的“池队列”,并在启动时加载N *个消息类实例。需要通信的线程必须从池中弹出*消息,将其加载并将其排队。当最终“用完”时,*消息被推回到池中。这限制了消息的数量,因此所有队列只需要长度为N - 不需要调整大小,不需要新建(),不需要删除(),便于泄漏检测。

+0

我简化了答案的代码,但实际上每个'任务'都存储'未知大小'的函子(以避免boost :: function <>的堆分配)。此外,我把这个任务加倍作为一个参考数据,只要未来持有它就保持活力。我最终每个任务有1个malloc,并且每秒可以推送240万个sync post/wait操作。 – bytemaster 2012-08-04 14:46:59