我已经写了一个同步队列来保存整数,并面临一个奇怪的竞争条件,我似乎无法理解。int队列与比较和交换有竞争条件
请做不是后解决方案,我知道如何修复代码并使其工作,我想知道竞争条件是什么以及为什么它不按预期工作。请帮助我了解发生了什么问题以及为什么。
首先代码的重要组成部分:
这是假定应用程序将永远不会投入更多的则缓冲器可容纳,因此没有检查当前缓冲区大小
static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value) {
if (value) { // 0 values are not allowed to be put in
size_t write_offset; // holds a current copy of the array index where to put the element
for (;;) {
// retrieve up to date write_offset copy and apply power-of-two modulus
write_offset = int_queue->write_offset & int_queue->modulus;
// if that cell currently holds 0 (thus is empty)
if (!int_queue->int_container[write_offset])
// Appetmt to compare and swap the new value in
if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
// if successful then this thread was the first do do this, terminate the loop, else try again
break;
}
// increment write offset signaling other threads where the next free cell is
int_queue->write_offset++;
// doing a synchronised increment here does not fix the race condition
}
}
这似乎有一个罕见的竞争条件,似乎不增加write_offset
。 在RedHat 2.6.32 Intel(R)Xeon(R)上测试OS X gcc 4.2,Intel Core i5 quadcore和Linux Intel C Compiler 12。两者都产生竞争条件。
完整的源测试用例:
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <stdint.h>
// #include "int_queue.h"
#include <stddef.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#ifndef INT_QUEUE_H
#define INT_QUEUE_H
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif
struct int_queue_s {
size_t size;
size_t modulus;
volatile size_t read_offset;
volatile size_t write_offset;
volatile long int int_container[0];
};
static inline void int_queue_put(struct int_queue_s * const __restrict int_queue, const long int value) {
if (value) {
int_queue->int_container[int_queue->write_offset & int_queue->modulus] = value;
int_queue->write_offset++;
}
}
static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value) {
if (value) {
size_t write_offset;
for (;;) {
write_offset = int_queue->write_offset & int_queue->modulus;
if (!int_queue->int_container[write_offset])
if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
break;
}
int_queue->write_offset++;
}
}
static inline long int int_queue_get(struct int_queue_s * const __restrict int_queue) {
size_t read_offset = int_queue->read_offset & int_queue->modulus;
if (int_queue->write_offset != int_queue->read_offset) {
const long int value = int_queue->int_container[read_offset];
int_queue->int_container[read_offset] = 0;
int_queue->read_offset++;
return value;
} else
return 0;
}
static inline long int int_queue_get_sync(struct int_queue_s * const __restrict int_queue) {
size_t read_offset;
long int volatile value;
for (;;) {
read_offset = int_queue->read_offset;
if (int_queue->write_offset == read_offset)
return 0;
read_offset &= int_queue->modulus;
value = int_queue->int_container[read_offset];
if (value)
if (__sync_bool_compare_and_swap(&(int_queue->int_container[read_offset]), (long int)value, (long int)0))
break;
}
int_queue->read_offset++;
return value;
}
static inline struct int_queue_s * int_queue_create(size_t num_values) {
struct int_queue_s * int_queue;
size_t modulus;
size_t temp = num_values + 1;
do {
modulus = temp;
temp--;
temp &= modulus;
} while (temp);
modulus <<= 1;
size_t int_queue_mem = sizeof(*int_queue) + (sizeof(int_queue->int_container[0]) * modulus);
if (int_queue_mem % sysconf(_SC_PAGE_SIZE)) int_queue_mem += sysconf(_SC_PAGE_SIZE) - (int_queue_mem % sysconf(_SC_PAGE_SIZE));
int_queue = mmap(NULL, int_queue_mem, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE , -1, 0);
if (int_queue == MAP_FAILED)
return NULL;
int_queue->modulus = modulus-1;
int_queue->read_offset = 0;
int_queue->write_offset = 0;
int_queue->size = num_values;
memset((void*)int_queue->int_container, 0, sizeof(int_queue->int_container[0]) * modulus);
size_t i;
for (i = 0; i < num_values;) {
int_queue_put(int_queue, ++i);
}
return int_queue;
}
#endif
void * test_int_queue_thread(struct int_queue_s * int_queue) {
long int value;
size_t i;
for (i = 0; i < 10000000; i++) {
int waited = -1;
do {
value = int_queue_get_sync(int_queue);
waited++;
} while (!value);
if (waited > 0) {
printf("waited %d cycles to get a new value\n", waited);
// continue;
}
// else {
printf("thread %p got value %ld, i = %zu\n", (void *)pthread_self(), value, i);
// }
int timesleep = rand();
timesleep &= 0xFFF;
usleep(timesleep);
int_queue_put_sync(int_queue, value);
printf("thread %p put value %ld back, i = %zu\n", (void *)pthread_self(), value, i);
}
return NULL;
}
int main(int argc, char ** argv) {
struct int_queue_s * int_queue = int_queue_create(2);
if (!int_queue) {
fprintf(stderr, "error initializing int_queue\n");
return -1;
}
srand(0);
long int value[100];
size_t i;
for (i = 0; i < 100; i++) {
value[0] = int_queue_get(int_queue);
if (!value[0]) {
printf("error getting value\n");
}
else {
printf("got value %ld\n", value[0]);
}
int_queue_put(int_queue, value[0]);
printf("put value %ld back successfully\n", value[0]);
}
pthread_t threads[100];
for (i = 0; i < 4; i++) {
pthread_create(threads + i, NULL, (void * (*)(void *))test_int_queue_thread, int_queue);
}
for (i = 0; i < 4; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
究竟是什么错误症状?它挂着吗? int_queur_put_sync()循环底部的pthread_yield()是否有帮助?如果在该循环中有99个线程在旋转,那么如果调度程序不够完美,它可能会让您的一位合法编写者难以完成。 –
这不是一个调度问题。我试着在16个物理内核上部署它,但它仍然处于死锁状态。就我所知,导致死锁的问题是'write_offset'错过了一个增量。 –
对不起,如果这是一个愚蠢的问题,但是是什么让你认为它是write_offset()上错过的增量?这听起来像失败模式是一个僵局,但试图了解你已经做了什么来证明这一点。只是想获得上下文。 –