2011-10-28 28 views
2

在读完C++ 0x的原子并结合非锁定队列之后,我决定去玩它们。在C++中使用原子的乐观锁定策略和排序

这个想法是写一个生产者,乐观锁定多个消费者队列。消息不需要消耗。只要消费者读取它即可读取最后一个版本或知道读取的结果不好,跳过就没有问题。

在下面的代码中,我想到的策略失败了。由于数据乱序写入,数据被破坏。任何指示为什么这是,以及如何解决它将不胜感激。在Linux

汇编:克++ -std =的C++ 0x -o代码code.cpp -lpthread

谢谢, 丹尼斯

// 
// This features 2 threads in which the first writes to a structure 
// and the second tries to read from that with an optimistic 
// locking strategy. The data is equal to the versioning so we can 
// see if the data is corrupt or not. 
// 
// @since: 2011-10-28 
// @author: Dennis Fleurbaaij <[email protected]> 
// 

#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <stdatomic.h> 
#include <sched.h> 
#include <assert.h> 
#include <iostream> 
#include <xmmintrin.h> 

struct structure_t 
{ 
    std::atomic<unsigned int> id; 
    unsigned int data_a; 
    unsigned int data_b; 

    char _pad[ 64 - 12 ]; 
}; 

#define NUM_STRUCTURES 2 
struct structure_t structures[NUM_STRUCTURES]; 

std::atomic<size_t> current_version_index; 

volatile bool start = false; 
volatile bool run = true; 
size_t const iter_count = 10000000; 

/** 
* Write thread 
*/ 
void* writer(void*) 
{ 
    while(!start) 
     sched_yield(); 

    size_t i; 
    for(i=0 ; i<iter_count ; ++i) 
    { 
     size_t index = current_version_index.load(std::memory_order_relaxed); 
     size_t next_index = (current_version_index + 1) & NUM_STRUCTURES-1; 

     structures[next_index].data_a = i; 
     structures[next_index].data_b = i; 

     structures[next_index].id.store(i, std::memory_order_release); 

     current_version_index.store(next_index); 

     //std::cout << "Queued - id: " << i << ", index: " << next_index << std::endl; 
     //sleep(1); 
    } 

    run=false; 
} 

/** 
* Read thread 
*/ 
void* reader(void*) 
{ 
    while(!start) 
     sched_yield(); 

    unsigned int prev_id=0; 

    size_t i; 
    while(run) 
    { 
     size_t index = current_version_index.load(std::memory_order_relaxed); 
     unsigned int id = structures[index].id.load(std::memory_order_acquire); 

     if(id > prev_id) 
     { 
      unsigned int data_a = structures[index].data_a; 
      unsigned int data_b = structures[index].data_b; 

      // Re-read the data and check optimistic lock. This should be read after 
      // the lines above and should not be optimized away. 
      // 
      // This is what fails after a while: 
      // Error in data. Index: 0, id: 24097, id2: 24097, data_a: 24099, data_b: 24099 
      unsigned int id2 = structures[index].id.load(std::memory_order_acquire); 
      if(id2 > id) 
      { 
       continue; 
      } 

      if(id != id2 || 
       id != data_a || 
       id != data_b) 
      { 
       std::cerr << "Error in data. Index: " << index << ", id: " << id 
          << ", id2: " << id2 << ", data_a: " << data_a << ", data_b: " << data_b << std::endl; 

       exit(EXIT_FAILURE); 
      } 

      //std::cout << "Read. Index: " << index << ", id: " << id 
      //    << ", data_a: " << data_a << ", data_b: " << data_b << std::endl; 

      prev_id = id; 
     } 

     _mm_pause(); 
    } 
} 

/** 
* Main 
*/ 
int main (int argc, char *argv[]) 
{ 
    assert(sizeof(structure_t) == 64); 

    pthread_t write_thread, read_thread; 
    pthread_create(&write_thread, NULL, writer, (void*)NULL); 
    pthread_create(&read_thread, NULL, reader, (void*)NULL); 

    sleep(1); 

    start = 1; 

    void *status; 
    pthread_join(read_thread, &status); 
    pthread_join(write_thread, &status); 
} 
+0

我不确定这不属于codereview而不是这里。无论如何,你应该用纯英语陈述你打算实施的策略,以便将其与实施的策略进行比较。 –

+0

我希望我在前面的文字中说清楚了。除了我写的内容外,你能告诉我你在设计方面需要什么吗? “这个想法是写一个单一的生产者,乐观锁定多个消费者队列。” 也许有补充。消息不需要消耗。只要消费者读取它即可读取最后一个版本或知道读取的结果不好,跳过就没有问题。 – DennisFleurbaaij

+0

我并不熟悉C++ 11原子的速度;可能相关的问题:'truct'的填充暗示你想要结构对齐?你如何确保它对齐?这对正确性有重要意义吗? – sehe

回答

0

也许,这是一个错误:

structures[next_index].data_a = i; 
    structures[next_index].data_b = i; 

// **** The writer may be interrupted (preempted) here for a long time *** 
// at the same time the reader reads new data but old id (number of reads doesn't matter) 

    structures[next_index].id.store(i, std::memory_order_release); // too late! 

(current_version_index可能由读者从前面的步骤采取,所以竞争条件实际上是可能的)

+0

你是完全正确的!我想到了这一点,但认为我得到的结果得到的是我看到的反面(更高的id,但这显然是错误的)。 再次感谢! – DennisFleurbaaij