2017-08-15 39 views
0

我在Windows上为一个生产者和一个消费者在共享内存中实现了一个进程间消息队列。如何保护进程间生产者消费者消息传递机制免受因一方崩溃而导致的损坏?

我使用一个命名信号量来算空槽,一个信号灯入选计数满插槽和一个命名的mutex来保护共享内存中的数据结构。

考虑,例如消费者方面。生产者方面是相似的。 首先它等待对充分旗语然后(1)它需要由互斥下的队列中的消息,然后它用信号空信号(2)

问题:

如果消费者进程在(1)和(2)之间崩溃,那么有效的进程可以使用的队列中的槽数减1。 假设当消费者宕机时,生产者可以处理队列被填满。 (它可以指定等待空信号量时的超时时间,或者甚至可以指定0表示不等待)。

当消费者重新启动时,它可以继续从队列中读取数据。数据不会被溢出,但即使它清空了所有的全部插槽后,制作者也会有一个空闲插槽可供使用。

多个这样的重新启动之后的队列将没有槽,可用于,没有消息可以被发送。

问:

怎么能这种情况是可以避免或返还?

+0

你最好跟踪共享内存块中队列的状态,并使用事件而不是信号量。我无法给出更具体的建议;您对现有算法的描述过于模糊。 (队列是一个环形缓冲区,或FIFO,或者什么?当消费者在崩溃后重新启动时,它如何知道首先要读取哪个插槽?) –

+0

在共享内存中管理的数据结构的细节在这里并不重要。对于这个讨论,假定一个带有两个偏移量的环形缓冲区,一个用于读取一个指向恒定大小的槽的写入。生产者仅使用和更改写入偏移量和仅使用者使用者,并更改读取偏移量。由于偏移量存储在共享内存中,重新启动的消费者简单在崩溃之前读取偏移量的位置处拾取。由于消费者没有释放写信号量,因此没有生产者超支的风险。 –

+0

由于队列的状态部分保持在信号量的状态,因此您提议跟踪共享内存中的队列状态不起作用。如果消费者在完整的信号量下等待并且没有在发生崩溃之前发出空信号量的信号,则完整项目的计数减少,但完整信号量的计数没有增加。 不会发生数据溢出,但生产者现在对队列中元素的总数有不正确的看法。 –

回答

1

这里有一个简单的方法大纲,使用事件,而不是信号灯:

DWORD increment_offset(DWORD offset) 
{ 
    offset++; 
    if (offset == QUEUE_LENGTH*2) offset = 0; 
    return offset; 
} 

void consumer(void) 
{ 
    for (;;) 
    { 
     DWORD current_write_offset = InterlockedCompareExchange(write_offset, 0, 0); 

     if ((current_write_offset != *read_offset + QUEUE_LENGTH) && 
      (current_write_offset + QUEUE_LENGTH != *read_offset)) 
     { 
      // Queue is not full, make sure producer is awake 
      SetEvent(signal_producer_event); 
     } 

     if (*read_offset == current_write_offset) 
     { 
      // Queue is empty, wait for producer to add a message 
      WaitForSingleObject(signal_consumer_event, INFINITE); 
      continue; 
     } 

     MemoryBarrier(); 
     _ReadWriteBarrier; 

     consume((*read_offset) % QUEUE_LENGTH); 

     InterlockedExchange(read_offset, increment_offset(*read_offset)); 
    } 
} 

void producer(void) 
{ 
    for (;;) 
    { 
     DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0); 

     if (current_read_offset != *write_offset) 
     { 
      // Queue is not empty, make sure consumer is awake 
      SetEvent(signal_consumer_event); 
     } 

     if ((*write_offset == current_read_offset + QUEUE_LENGTH) || 
      (*write_offset + QUEUE_LENGTH == current_read_offset)) 
     { 
      // Queue is full, wait for consumer to remove a message 
      WaitForSingleObject(signal_producer_event, INFINITE); 
      continue; 
     } 

     produce((*write_offset) % QUEUE_LENGTH); 

     MemoryBarrier(); 
     _ReadWriteBarrier; 

     InterlockedExchange(write_offset, increment_offset(*write_offset)); 
    } 
} 

注:

  • 代码张贴编译(给予适当的声明),但我有没有其他测试它。

  • read_offset是共享内存中指向DWORD的指针,指示应从下一个读取哪个插槽。类似地,write_offset指向共享存储器中的DWORD,指示下一个应写入哪个插槽。

  • QUEUE_LENGTH + x的偏移量指的是与x的偏移量相同的时隙,以消除完整队列和空队列之间的歧义。这就是为什么increment_offset()函数检查QUEUE_LENGTH*2而不仅仅是QUEUE_LENGTH,以及为什么我们调用consume()produce()函数时取模。 (这种方法的一种替代方法是修改生产者从不使用最后一个可用插槽,但这会浪费一个插槽。)

  • signal_consumer_eventsignal_producer_event必须是自动重置事件。请注意,设置已设置的事件是无操作的。

  • 如果队列实际上是空的,消费者仅等待其事件,并且如果队列实际上已满,则生产者仅等待其事件。

  • 当任一进程被唤醒时,它必须重新检查队列的状态,因为存在可能导致虚假唤醒的争用条件。

  • 因为我使用互锁操作,并且因为一次只有一个进程正在使用任何特定的插槽,所以不需要互斥锁。我已经包含了内存障碍,以确保生产者写入插槽的更改将被消费者看到。如果您不熟悉无锁代码,您会发现将所显示的算法转换为使用互斥锁并不重要。

  • 请注意,InterlockedCompareExchange(pointer, 0, 0);看起来有点复杂,但它只是一个线程安全等价于*pointer,即它读取指针处的值。同样,InterlockedExchange(pointer, value);*pointer = value;相同,但是线程安全。根据编译器和目标体系结构的不同,互锁操作可能不是绝对必要的,但性能影响可以忽略不计,因此我建议您进行防御性编程。

考虑消费者在致电consume()函数期间(或之前)崩溃的情况。当消费者重新启动时,它会再次选取相同的消息并按正常方式处理。就制作人而言,没有什么不寻常的事情发生,除了消息比平时处理时间更长。如果生产者在创建消息时崩溃,则会出现类似的情况;重新启动时,生成的第一条消息将覆盖不完整的消息,并且消费者不会受到影响。显然,如果崩溃发生在调用InterlockedExchange之后,但在生产者或消费者调用SetEvent之前发生,并且如果队列先前是空的或已满的,那么另一个进程将不会在那个时候被唤醒点。但是,一旦崩溃的进程重新启动,它就会被唤醒。您不能在队列中丢失插槽,并且进程无法死锁。

我认为简单的多生产单消费的情况会是这个样子:

void producer(void) 
{ 
    for (;;) 
    { 
     DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0); 

     if (current_read_offset != *write_offset) 
     { 
      // Queue is not empty, make sure consumer is awake 
      SetEvent(signal_consumer_event); 
     } 

     produce_in_local_cache(); 

     claim_mutex(); 

     // read offset may have changed, re-read it 
     current_read_offset = InterlockedCompareExchange(read_offset, 0, 0); 

     if ((*write_offset == current_read_offset + QUEUE_LENGTH) || 
      (*write_offset + QUEUE_LENGTH == current_read_offset)) 
     { 
      // Queue is full, wait for consumer to remove a message 
      WaitForSingleObject(signal_producer_event, INFINITE); 
      continue; 
     } 

     copy_from_local_cache_to_shared_memory((*write_offset) % QUEUE_LENGTH); 

     MemoryBarrier(); 
     _ReadWriteBarrier; 

     InterlockedExchange(write_offset, increment_offset(*write_offset)); 

     release_mutex(); 
    } 
} 

如果活动制片人崩溃,如废弃的互斥体将被检测;你可以像对待这个互斥体一样正确地释放这个情况。如果崩溃的进程增加了写入偏移量,那么它添加的条目将照常进行处理;如果不是,它会被任何一个制片人声称互斥体覆盖。在任何情况下都不需要任何特殊行动。

+0

谢谢哈利,这个清楚而明确的回答。我会测试它,并在这里报告如何。 目前我有一个问题,但。这只会支持单一生产者,单一消费者吗?虽然这正是我所要求的,但如果我可以为多生产者单一消费者场景工作,那将是非常好的。 谢谢! –

+0

该算法特定于单个生产者,单个消费者。多个生产者,多个消费者看起来会完全不同,在这种情况下,环形缓冲区可能不是最好的选择。你可能想检查一本算法或其他的东西;尽管如果我有时间思考这个问题,我相信我可以想出一些有用的东西,但这可能不是最好的解决方案。 –

+0

谢谢。我环顾四周,但没有找到满足所有要求的解决方案。我为Multiple Producer Single Consumer案例发布了另一个问题(https://stackoverflow.com/questions/45768849/how-can-i-implement-a-robust-interprocess-multiple-producer-single-consumer-mess)。 –