2015-02-09 162 views
3

我在C中编写了一个通用队列,用于各种有效载荷类型。它是一个阻塞队列,消费者线程将阻止等待队列由生产者线程填充。sem_wait上的线程阻塞导致其他线程挂起

我已经使用check隔离测试了队列代码,包括线程阻塞等待将值添加到队列中的行为。所有这些测试都通过了,但是,当将队列集成到代码的其余部分时,我遇到了第一次线程试图阻塞队列时所有其他线程挂起的情况。

具体而言,我所整合的程序是一个更大的生态系统的成员,所以有一个启动脚本来初始化程序,然后进行守护。守护程序线程然后创建几个分离的线程来执行各种功能。其中一个线程调用sem_wait并挂起所有线程,包括产生守护进程的线程。

为了确认此次调用是否是问题,我使用调试程序以非后台程序模式运行程序,该调试程序确认sem_wait挂起。在产生等待队列的线程之前,我还添加了sleep。在这种情况下,其他线程进一步前进,然后在sem_wait调用时挂起。

有问题的队列只对这一个程序可见。其引用存储为全局变量。当执行sem_wait的呼叫时,队列肯定是空的。

以下是队列代码:

//Queue.h 
#include <pthread.h> 
#include <semaphore.h> 

typedef void (*freeFunction)(void *); 

typedef struct _queueNode { 
    void *data; 
    struct _queueNode *next; 
} queueNode; 


typedef struct queue { 
    sem_t *logicalLength; 
    size_t elementSize; 

    queueNode *head; 
    queueNode *tail; 

    freeFunction freeFn; 
    pthread_mutex_t *queueLock; 
} queue_t; 

void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn); 
void queue_destroy(queue_t *queue); // Removes all elements from the queue 

int queue_size(queue_t *queue); // Returns the number of elements in the queue 

void queue_add(queue_t *queue, void *element);   // Adds to tail 
int queue_take(queue_t *queue, void *elementBuffer); // Returns/removes head, blocks if empty 


//Queue.c 
#include <stdlib.h> 
#include <string.h> 
#include <assert.h> 
#include <time.h> 

#include "Queue.h" 

void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn) { 

    assert(elementSize > 0); 
    assert(queue != NULL); 

    queue->elementSize = elementSize; 

    queue->head = NULL; 
    queue->tail = NULL; 

    queue->freeFn = freeFn; 

    queue->logicalLength = calloc(1, sizeof(sem_t)); 
    queue->queueLock = calloc(1, sizeof(pthread_mutex_t)); 

    sem_init(queue->logicalLength, 0, 0); 

    pthread_mutexattr_t attr; 
    pthread_mutexattr_init(&attr); 
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); 
    pthread_mutex_init(queue->queueLock, &attr); 

} 

void queue_destroy(queue_t *queue) { 

    assert(queue != NULL); 

    queueNode *current; 

    while(queue->head != NULL) { 

     current = queue->head; 
     queue->head = current->next; 

     if(queue->freeFn != NULL) { 

      queue->freeFn(current->data); 

     } 

     free(current->data); 
     free(current); 

    } 

    queue->head = NULL; 
    queue->tail = NULL; 

    pthread_mutex_destroy(queue->queueLock); 
    sem_destroy(queue->logicalLength); 

    free(queue->queueLock); 
    free(queue->logicalLength); 

} 

void queue_add(queue_t *queue, void *element) { 

    assert(queue != NULL); 
    assert(element != NULL); 

    pthread_mutex_lock(queue->queueLock); 

     queueNode *node = calloc(1, sizeof(queueNode)); 
     node->data = calloc(1, queue->elementSize); 

     node->next = NULL; 

     memcpy(node->data, element, queue->elementSize); 

     if(queue->head == NULL) { 

      queue->head = queue->tail = node; 

     } else { 

      queue->tail->next = node; 
      queue->tail = node; 

     } 

     sem_post(queue->logicalLength); 

    pthread_mutex_unlock(queue->queueLock); 

} 

void queue_removeNode(queue_t *queue, void *elementBuffer) { 

    pthread_mutex_lock(queue->queueLock); 

     if(queue->head == NULL) { 

      pthread_mutex_unlock(queue->queueLock); 
      return; 
     } 

     queueNode *node = queue->head; 
     memcpy(elementBuffer, node->data, queue->elementSize); 

     if(queue->head == queue->tail) 
      queue->tail = NULL; 

     queue->head = node->next; 

     if(queue->freeFn) { 

      queue->freeFn(node->data); 
     } 

     free(node->data); 
     free(node); 

    pthread_mutex_unlock(queue->queueLock); 

} 

int queue_take(queue_t *queue, void *elementBuffer) { 

    assert(queue != NULL); 
    assert(elementBuffer != NULL); 

    int result = EXIT_SUCCESS; 

    sem_wait(queue->logicalLength); 

    queue_removeNode(queue, elementBuffer); 

    return result; 
} 

下面是其显示该问题的代码:

//fei.h 
... 
#include "Queue.h" 
extern queue_t *commandQueue; 
... 

//fei.c 
#include "fei.h" 
#include "commandHandler.h" 
#include "Queue.h" 

queue_t *commandQueue; 

int main (int argc, char **argv){ 

    int debugFlag = handleOpts(argc, argv); 

    if(!debugFlag){ 
     int rc = daemonize(); 
     if(rc != 0){ 
      exit(rc); 
     } 
    } 

    rc = setConfigValues(); 
    if(rc){ 
     exit(rc); 
    } 

    queue_t *commandQueue = calloc(1, sizeof(queue_t)); 
    queue_initialize(commandQueue, sizeof(commandPack_t), commandFree); 

    if(getPortIsock() == 0){ // This is a simple config value 
     exit(EXIT_FAILURE); 
    } 

    signal(SIGPIPE, SIG_IGN); 

    pthread_t id; 
    pthread_attr_t attr; 
    pthread_attr_init(&attr); 
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 
    pthread_create(&id, &attr, receiveCommands, NULL); 
    pthread_create(&id, &attr, processCommands, NULL); 

    if(!setSocketIsock()){ 
     exit(1); 
    } 
    while(!checkIfConnectedToSct()) 
     usleep(50000); 

    receiveCCSDSPackets(); 
    exit (0); 
} 

// commandHandler.c 
#include "Queue.h" 
#include "fei.h" 
#include "commandHandler.h" 

queue_t *commandQueue; 

void *receiveCommands(){ 

    getNewCsockConnection(); 
    connectedToSct = 1; 

    while(1){ 
     commandPack_t cmd; 
     int validCommand = getCommand(CSOCKET, &cmd); 
     if(validCommand == RECEIVE_SUCCESS){ 

     queue_add(commandQueue, &cmd); 

     } else{ 
      usleep(5000); 
     } 
    } 
    return NULL; 
} 

void *processCommands(){ 
    while(1){ 
     commandPack_t cmdToProcess; 

     /* Blocking queue */ 
     queue_take(commandQueue, &cmdToProcess); 


     switch(cmdToProcess.command){ 
      // Command processing 
     } 

     commandFree(&cmdToProcess); 
    } 
    return NULL; 
} 

receiveCommands功能是生产者线程和processCommands功能消费者线程。这些是代码库中唯一指向commandQueue的地方。虽然它是可变的,但主线程的执行很少超出setSocketIsock()条件检查。

任何洞察力是赞赏。

+0

在queue_add(),这是通常的semaphre单位解除锁定,而不是在它里面之后发布,在奥得河,以防止取线程立即成为运行,击中锁定然后再次停止,直到添加线程开始释放锁定。这不是导致你的锁定,但:( – 2015-02-09 20:24:06

+0

谢谢。这是一个很好的观点。我也知道,我应该检查'sem_wait'的输出,如果它没有成功阻塞。太糟糕了,我的问题是,它是阻止TOO成功 – nmogk 2015-02-09 21:20:18

回答

1

main(),你有这样一行:

queue_t *commandQueue = calloc(1, sizeof(queue_t)); 

这使得commandQueuemain一个局部变量。您的其他函数使用一个也称为commandQueue的全局变量。这使我认为你不打算在main中重新声明commandQueue。因此,改变上述行来此:

commandQueue = calloc(1, sizeof(queue_t)); 
+0

Ah!Well spotted! – 2015-02-10 11:44:42

+0

就是这样,简单而阴险,谢谢你的帮助。 – nmogk 2015-02-10 16:43:19