这里有一个简单的方法大纲,使用事件,而不是信号灯:
DWORD increment_offset(DWORD offset)
{
offset++;
if (offset == QUEUE_LENGTH*2) offset = 0;
return offset;
}
void consumer(void)
{
for (;;)
{
DWORD current_write_offset = InterlockedCompareExchange(write_offset, 0, 0);
if ((current_write_offset != *read_offset + QUEUE_LENGTH) &&
(current_write_offset + QUEUE_LENGTH != *read_offset))
{
// Queue is not full, make sure producer is awake
SetEvent(signal_producer_event);
}
if (*read_offset == current_write_offset)
{
// Queue is empty, wait for producer to add a message
WaitForSingleObject(signal_consumer_event, INFINITE);
continue;
}
MemoryBarrier();
_ReadWriteBarrier;
consume((*read_offset) % QUEUE_LENGTH);
InterlockedExchange(read_offset, increment_offset(*read_offset));
}
}
void producer(void)
{
for (;;)
{
DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);
if (current_read_offset != *write_offset)
{
// Queue is not empty, make sure consumer is awake
SetEvent(signal_consumer_event);
}
if ((*write_offset == current_read_offset + QUEUE_LENGTH) ||
(*write_offset + QUEUE_LENGTH == current_read_offset))
{
// Queue is full, wait for consumer to remove a message
WaitForSingleObject(signal_producer_event, INFINITE);
continue;
}
produce((*write_offset) % QUEUE_LENGTH);
MemoryBarrier();
_ReadWriteBarrier;
InterlockedExchange(write_offset, increment_offset(*write_offset));
}
}
注:
代码张贴编译(给予适当的声明),但我有没有其他测试它。
read_offset
是共享内存中指向DWORD
的指针,指示应从下一个读取哪个插槽。类似地,write_offset
指向共享存储器中的DWORD
,指示下一个应写入哪个插槽。
QUEUE_LENGTH + x
的偏移量指的是与x
的偏移量相同的时隙,以消除完整队列和空队列之间的歧义。这就是为什么increment_offset()
函数检查QUEUE_LENGTH*2
而不仅仅是QUEUE_LENGTH
,以及为什么我们调用consume()
和produce()
函数时取模。 (这种方法的一种替代方法是修改生产者从不使用最后一个可用插槽,但这会浪费一个插槽。)
signal_consumer_event
和signal_producer_event
必须是自动重置事件。请注意,设置已设置的事件是无操作的。
如果队列实际上是空的,消费者仅等待其事件,并且如果队列实际上已满,则生产者仅等待其事件。
当任一进程被唤醒时,它必须重新检查队列的状态,因为存在可能导致虚假唤醒的争用条件。
因为我使用互锁操作,并且因为一次只有一个进程正在使用任何特定的插槽,所以不需要互斥锁。我已经包含了内存障碍,以确保生产者写入插槽的更改将被消费者看到。如果您不熟悉无锁代码,您会发现将所显示的算法转换为使用互斥锁并不重要。
请注意,InterlockedCompareExchange(pointer, 0, 0);
看起来有点复杂,但它只是一个线程安全等价于*pointer
,即它读取指针处的值。同样,InterlockedExchange(pointer, value);
与*pointer = value;
相同,但是线程安全。根据编译器和目标体系结构的不同,互锁操作可能不是绝对必要的,但性能影响可以忽略不计,因此我建议您进行防御性编程。
考虑消费者在致电consume()
函数期间(或之前)崩溃的情况。当消费者重新启动时,它会再次选取相同的消息并按正常方式处理。就制作人而言,没有什么不寻常的事情发生,除了消息比平时处理时间更长。如果生产者在创建消息时崩溃,则会出现类似的情况;重新启动时,生成的第一条消息将覆盖不完整的消息,并且消费者不会受到影响。显然,如果崩溃发生在调用InterlockedExchange
之后,但在生产者或消费者调用SetEvent
之前发生,并且如果队列先前是空的或已满的,那么另一个进程将不会在那个时候被唤醒点。但是,一旦崩溃的进程重新启动,它就会被唤醒。您不能在队列中丢失插槽,并且进程无法死锁。
我认为简单的多生产单消费的情况会是这个样子:
void producer(void)
{
for (;;)
{
DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);
if (current_read_offset != *write_offset)
{
// Queue is not empty, make sure consumer is awake
SetEvent(signal_consumer_event);
}
produce_in_local_cache();
claim_mutex();
// read offset may have changed, re-read it
current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);
if ((*write_offset == current_read_offset + QUEUE_LENGTH) ||
(*write_offset + QUEUE_LENGTH == current_read_offset))
{
// Queue is full, wait for consumer to remove a message
WaitForSingleObject(signal_producer_event, INFINITE);
continue;
}
copy_from_local_cache_to_shared_memory((*write_offset) % QUEUE_LENGTH);
MemoryBarrier();
_ReadWriteBarrier;
InterlockedExchange(write_offset, increment_offset(*write_offset));
release_mutex();
}
}
如果活动制片人崩溃,如废弃的互斥体将被检测;你可以像对待这个互斥体一样正确地释放这个情况。如果崩溃的进程增加了写入偏移量,那么它添加的条目将照常进行处理;如果不是,它会被任何一个制片人声称互斥体覆盖。在任何情况下都不需要任何特殊行动。
你最好跟踪共享内存块中队列的状态,并使用事件而不是信号量。我无法给出更具体的建议;您对现有算法的描述过于模糊。 (队列是一个环形缓冲区,或FIFO,或者什么?当消费者在崩溃后重新启动时,它如何知道首先要读取哪个插槽?) –
在共享内存中管理的数据结构的细节在这里并不重要。对于这个讨论,假定一个带有两个偏移量的环形缓冲区,一个用于读取一个指向恒定大小的槽的写入。生产者仅使用和更改写入偏移量和仅使用者使用者,并更改读取偏移量。由于偏移量存储在共享内存中,重新启动的消费者简单在崩溃之前读取偏移量的位置处拾取。由于消费者没有释放写信号量,因此没有生产者超支的风险。 –
由于队列的状态部分保持在信号量的状态,因此您提议跟踪共享内存中的队列状态不起作用。如果消费者在完整的信号量下等待并且没有在发生崩溃之前发出空信号量的信号,则完整项目的计数减少,但完整信号量的计数没有增加。 不会发生数据溢出,但生产者现在对队列中元素的总数有不正确的看法。 –