2012-11-26 31 views
1

我已经写了一个同步队列来保存整数,并面临一个奇怪的竞争条件,我似乎无法理解。int队列与比较和交换有竞争条件

请做不是后解决方案,我知道如何修复代码并使其工作,我想知道竞争条件是什么以及为什么它不按预期工作。请帮助我了解发生了什么问题以及为什么。

首先代码的重要组成部分:

这是假定应用程序将永远不会投入更多的则缓冲器可容纳,因此没有检查当前缓冲区大小

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value) { 
    if (value) { // 0 values are not allowed to be put in 
     size_t write_offset; // holds a current copy of the array index where to put the element 
     for (;;) { 
      // retrieve up to date write_offset copy and apply power-of-two modulus 
      write_offset = int_queue->write_offset & int_queue->modulus; 
      // if that cell currently holds 0 (thus is empty) 
      if (!int_queue->int_container[write_offset]) 
       // Appetmt to compare and swap the new value in 
       if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value)) 
        // if successful then this thread was the first do do this, terminate the loop, else try again 
        break; 
     } 

     // increment write offset signaling other threads where the next free cell is 
     int_queue->write_offset++; 
     // doing a synchronised increment here does not fix the race condition 
    } 
} 

这似乎有一个罕见的竞争条件,似乎不增加write_offset。 在RedHat 2.6.32 Intel(R)Xeon(R)上测试OS X gcc 4.2,Intel Core i5 quadcore和Linux Intel C Compiler 12。两者都产生竞争条件。

完整的源测试用例:

#include <pthread.h> 

#include <stdlib.h> 
#include <stdio.h> 
#include <unistd.h> 
#include <stdint.h> 

// #include "int_queue.h" 
#include <stddef.h> 
#include <string.h> 
#include <unistd.h> 
#include <sys/mman.h> 

#ifndef INT_QUEUE_H 
#define INT_QUEUE_H 

#ifndef MAP_ANONYMOUS 
#define MAP_ANONYMOUS MAP_ANON 
#endif 

struct int_queue_s { 
    size_t size; 
    size_t modulus; 
    volatile size_t read_offset; 
    volatile size_t write_offset; 
    volatile long int int_container[0]; 
}; 

static inline void int_queue_put(struct int_queue_s * const __restrict int_queue, const long int value) { 
    if (value) { 
     int_queue->int_container[int_queue->write_offset & int_queue->modulus] = value; 
     int_queue->write_offset++; 
    } 
} 

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value) { 
    if (value) { 
     size_t write_offset; 
     for (;;) { 
      write_offset = int_queue->write_offset & int_queue->modulus; 
      if (!int_queue->int_container[write_offset]) 
       if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value)) 
        break; 
     } 

     int_queue->write_offset++; 
    } 
} 

static inline long int int_queue_get(struct int_queue_s * const __restrict int_queue) { 
    size_t read_offset = int_queue->read_offset & int_queue->modulus; 
    if (int_queue->write_offset != int_queue->read_offset) { 
     const long int value = int_queue->int_container[read_offset]; 
     int_queue->int_container[read_offset] = 0; 
     int_queue->read_offset++; 
     return value; 
    } else 
     return 0; 
} 

static inline long int int_queue_get_sync(struct int_queue_s * const __restrict int_queue) { 
    size_t read_offset; 
    long int volatile value; 
    for (;;) { 

     read_offset = int_queue->read_offset; 
     if (int_queue->write_offset == read_offset) 
      return 0; 
     read_offset &= int_queue->modulus; 
     value = int_queue->int_container[read_offset]; 
     if (value) 
      if (__sync_bool_compare_and_swap(&(int_queue->int_container[read_offset]), (long int)value, (long int)0)) 
       break; 
    } 
    int_queue->read_offset++; 
    return value; 
} 

static inline struct int_queue_s * int_queue_create(size_t num_values) { 

    struct int_queue_s * int_queue; 
    size_t modulus; 
    size_t temp = num_values + 1; 
    do { 
     modulus = temp; 
     temp--; 
     temp &= modulus; 
    } while (temp); 
    modulus <<= 1; 

    size_t int_queue_mem = sizeof(*int_queue) + (sizeof(int_queue->int_container[0]) * modulus); 

    if (int_queue_mem % sysconf(_SC_PAGE_SIZE)) int_queue_mem += sysconf(_SC_PAGE_SIZE) - (int_queue_mem % sysconf(_SC_PAGE_SIZE)); 

    int_queue = mmap(NULL, int_queue_mem, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE , -1, 0); 
    if (int_queue == MAP_FAILED) 
     return NULL; 

    int_queue->modulus = modulus-1; 
    int_queue->read_offset = 0; 
    int_queue->write_offset = 0; 
    int_queue->size = num_values; 

    memset((void*)int_queue->int_container, 0, sizeof(int_queue->int_container[0]) * modulus); 

    size_t i; 
    for (i = 0; i < num_values;) { 
     int_queue_put(int_queue, ++i); 
    } 

    return int_queue; 
} 


#endif 


void * test_int_queue_thread(struct int_queue_s * int_queue) { 
    long int value; 

    size_t i; 

    for (i = 0; i < 10000000; i++) { 


     int waited = -1; 
     do { 
      value = int_queue_get_sync(int_queue); 
      waited++; 
     } while (!value); 

     if (waited > 0) { 
      printf("waited %d cycles to get a new value\n", waited); 
      // continue; 
     } 

     // else { 
     printf("thread %p got value %ld, i = %zu\n", (void *)pthread_self(), value, i); 
     // } 

     int timesleep = rand(); 
     timesleep &= 0xFFF; 

     usleep(timesleep); 

     int_queue_put_sync(int_queue, value); 

     printf("thread %p put value %ld back, i = %zu\n", (void *)pthread_self(), value, i); 
    } 

    return NULL; 
} 


int main(int argc, char ** argv) { 
    struct int_queue_s * int_queue = int_queue_create(2); 

    if (!int_queue) { 
     fprintf(stderr, "error initializing int_queue\n"); 
     return -1; 
    } 

    srand(0); 

    long int value[100]; 

    size_t i; 

    for (i = 0; i < 100; i++) { 
     value[0] = int_queue_get(int_queue); 

     if (!value[0]) { 
      printf("error getting value\n"); 
     } 
     else { 
      printf("got value %ld\n", value[0]); 
     } 

     int_queue_put(int_queue, value[0]); 

     printf("put value %ld back successfully\n", value[0]); 
    } 

    pthread_t threads[100]; 
    for (i = 0; i < 4; i++) { 
     pthread_create(threads + i, NULL, (void * (*)(void *))test_int_queue_thread, int_queue); 
    } 

    for (i = 0; i < 4; i++) { 
     pthread_join(threads[i], NULL); 
    } 


    return 0; 
} 
+0

究竟是什么错误症状?它挂着吗? int_queur_put_sync()循环底部的pthread_yield()是否有帮助?如果在该循环中有99个线程在旋转,那么如果调度程序不够完美,它可能会让您的一位合法编写者难以完成。 –

+0

这不是一个调度问题。我试着在16个物理内核上部署它,但它仍然处于死锁状态。就我所知,导致死锁的问题是'write_offset'错过了一个增量。 –

+1

对不起,如果这是一个愚蠢的问题,但是是什么让你认为它是write_offset()上错过的增量?这听起来像失败模式是一个僵局,但试图了解你已经做了什么来证明这一点。只是想获得上下文。 –

回答

5

有趣的问题。这是一个疯狂的猜测。 :-)

看来你需要read_offset和write_offset之间的一些同步。

例如,这里是一个可能相关或不相关的竞赛。在您的比较和交换和write_offset增量之间,您可能会有一个阅读器进入并将该值设置为零。

Writer-1: get write_offset=0 
Writer-2: get write_offset=0 
Writer-1: compare-and-swap at offset=0 
Writer-1: Set write_offset=1 
Reader-1: compare-and-swap at offset=0 (sets it back to zero) 
Writer-2: compare-and-swap at offset=0 again even though write_offset=1 
Writer-2: Set write_offset=2 
+0

不错的一个。这是一个合理的解释。几个星期以来一直在讨厌我。现在我知道为什么同步价值是一个坏主意。这将使一个令人敬畏的考试问题:) –

+0

*很*很好的接收 - 我希望我可以多次upvote。 –

+0

@SergeyL。 - 我很高兴我不在班上(可能是作为审计)。 –

0

我相信int_queue->write_offset++;的问题是:如果两个线程同时执行该指令,它们都会从内存加载相同的值,增加它,并存储相同返回结果(使变量仅增加1)。

+0

但一次只有一个线程会到达那里。为了达到那一行,一个线程需要在比较和交换上取得成功,这只有在以前没有其他线程成功的情况下才是如此。在此时,所有其他线程将循环,直到写入偏移量已由成功的线程增加并写入内存为止。 –

+0

@SergeyL:不,两个线程可能一次到达那里。比较和交换只保护实际的比较和交换操作不被多个线程同时执行;其他一切可能仍然会同时执行。一个线程可能在c-a-s中成功并退出循环,然后被调度程序换出;另一个线程(丢失了原始的c-a-s)可能会在循环中进行另一轮,并在下一个c-a-s上成功,然后第一个线程可能会继续,以使两个线程同时达到增量。 –

+0

如果一个线程在cas上成功并且在执行增量之前暂停,那么没有其他人可以执行cas。如果另一个线程在同一位置执行cas操作,cas只会交换内存中不再存在的0值。 –

-1

我看来是

int_queue->write_offset++; 

write_offset = int_queue->write_offset & int_queue->modulus; 

不是线程安全的