2009-02-24 31 views
4

嘿,大家好。我一直在研究一些代码,以便在需要时产生线程,但是决定一个更简单和更有效的解决方案是创建一个线程池。它使用我有条件等待排队和出队的队列来实现。我发布这个的唯一原因是因为我在我的代码中随机地得到了一些奇怪的错误,这些错误是在切换到线程池之前从未发生过的,当我添加了一些调试打印语句时,这些错误消失了。如果我的代码因为打印语句而开始工作,这听起来像是内存问题,堆栈可能由一些不好的线程代码引起。ThreadPool最佳实践,正确性

我觉得首先要看看线程池中的正确性和线程安全性。这是3个主要功能。 Threadstart是每个线程等待出队的函数,以及产生线程的init函数。最后一个是排队工作项目。 q_enq函数将唤醒某些线程然后出列的条件变量。

void * 
threadstart(void *arg) 
{ 
    threadpool_t * tp = (threadpool_t*)arg; 

    while (1) 
    { 
     workitem_t *work = q_dq(tp->workqueue); 

     if (work == NULL) 
      break; 

     (*work->action)(work->arg); 
     free(work); 
    } 

    pthread_exit(NULL); 
}; 

threadpool_t * 
threadpool_init(int max_threads, int max_workload) 
{ 
    threadpool_t *tp; 
    pthread_attr_t attr; 
    register int i=0; 
    int rc =0; 
    ASSERT(max_threads > 0 && max_workload > 0); 

    tp = malloc(sizeof(threadpool_t)); 
    tp->max_threads = max_threads; 
    tp->threads = calloc(max_threads, sizeof(pthread_t)); 
    tp->workqueue = q_init(max_workload); 

    pthread_attr_init(&attr); 
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 
    pthread_attr_setschedpolicy(&attr, SCHED_RR); 

    for (; i < max_threads; i++) 
    { 
     rc = pthread_create(&tp->threads[i], &attr, threadstart, tp); 

     /* worry about errors creating threads later :(*/ 
     if (rc) printf("Error creating threadpool thread %d [%d]\r\n", i, rc); 
    } 
    pthread_attr_destroy(&attr); 

    return tp; 
} 
void 
threadpool_q_workitem(threadpool_t *tp, action_f action, void *arg) 
{ 
    workitem_t *item; 
    ASSERT(tp != NULL); 

    item = malloc(sizeof(workitem_t)); 
    item->action = action; 
    item->arg = arg; 
    q_enq(tp->workqueue, (void*)item); 
}; 

编辑:队列功能

void q_enq(struct queue *q, void *data) { 
    struct timeval now; 
    struct timespec timeout; 

    pthread_mutex_lock(q->mut); 

    while (q->full) { 
     gettimeofday(&now, (struct timezone *)0); 
     timeout.tv_sec = now.tv_sec + Q_TIMEOUT; 
     timeout.tv_nsec = now.tv_usec * 1000; 

     pthread_cond_timedwait(q->notfull, q->mut, &timeout); 

    } 
    q->buffer[q->tail++] = data; 
    if (q->tail == q->num) { 
     q->tail = 0; 
    } 
    if (q->head == q->tail) { 
     q->full = 1; 
    } 
    q->empty = 0; 

    pthread_mutex_unlock(q->mut); 
    pthread_cond_signal(q->notempty); 
} 

void *q_dq(struct queue *q) { 
    void *data; 
    int rc; 
    struct timeval now; 
    struct timespec timeout; 

    pthread_mutex_lock(q->mut); 

    while (q->empty) { 
     gettimeofday(&now, NULL); 
     timeout.tv_sec = now.tv_sec + Q_TIMEOUT; 
     timeout.tv_nsec = now.tv_usec * 1000; 
     if (q->finished) { 
      pthread_mutex_unlock(q->mut); 
      return NULL; 
     } 

     rc = pthread_cond_timedwait(q->notempty, q->mut, &timeout); 
     if (q->finished) { 
      pthread_mutex_unlock(q->mut); 
      return NULL; 
     } 
    } 
    data = q->buffer[q->head++]; 
    if (q->head == q->num) { 
     q->head = 0; 
    } 
    if (q->head == q->tail) { 
     q->empty = 1; 
    } 
    q->full = 0; 
    pthread_mutex_unlock(q->mut); 
    pthread_cond_signal(q->notfull); 

    return data; 
} 
+0

请注意q_enq和q_dq函数,因为它必须具有锁定功能。哪一个是最可能的问题。 – sfossen 2009-02-24 15:28:27

回答

6

我认为你应该做这样的事情:

void q_enq(struct queue *q, void *data) { 
    int next; 

    // wait until there's room 
    do{ 
     pthread_mutex_lock(q->mut); 

     next = q->tail + 1; 
     if(next == q->num) { 
      next = 0; 
     } 

     // still room to add 
     if(q->head != next) 
      break; 

     pthread_mutex_unlock(q->mut); 
     sched_yield(); 
    } while(1); 

    q->tail = next; 
    q->buffer[ q->tail ] = data; 

    // signal consumer and unlock mutex 
    pthread_cond_signal(q->notempty); 
    pthread_mutex_unlock(q->mut); 
} 

void *q_dq(struct queue *q) { 
    void *data; 
    int rc; 

    pthread_mutex_lock(q->mut); 

    // while empty wait 
    while(q->tail == q->head){ 
     pthread_cond_wait(q->notempty, q->mut); 
    } 

    // get next and wrap 
    data = q->buffer[q->head++]; 
    if (q->head == q->num) { 
     q->head = 0; 
    } 

    pthread_mutex_unlock(q->mut); 
    return data; 
} 
+0

今天晚些时候我会试试看。我正在运行一些东西,没有时间重建... – 2009-02-24 16:27:49

+0

你有机会尝试它吗?还没有 – sfossen 2009-02-25 21:47:04

1

你打电话pthread_attr_destroy太早;请不要致电pthread_attr_destroy,直到其销毁池(即保持线程的使用寿命)