嘿,大家好。我一直在研究一些代码,以便在需要时产生线程,但是决定一个更简单和更有效的解决方案是创建一个线程池。它使用我有条件等待排队和出队的队列来实现。我发布这个的唯一原因是因为我在我的代码中随机地得到了一些奇怪的错误,这些错误是在切换到线程池之前从未发生过的,当我添加了一些调试打印语句时,这些错误消失了。如果我的代码因为打印语句而开始工作,这听起来像是内存问题,堆栈可能由一些不好的线程代码引起。ThreadPool最佳实践,正确性
我觉得首先要看看线程池中的正确性和线程安全性。这是3个主要功能。 Threadstart是每个线程等待出队的函数,以及产生线程的init函数。最后一个是排队工作项目。 q_enq函数将唤醒某些线程然后出列的条件变量。
void *
threadstart(void *arg)
{
threadpool_t * tp = (threadpool_t*)arg;
while (1)
{
workitem_t *work = q_dq(tp->workqueue);
if (work == NULL)
break;
(*work->action)(work->arg);
free(work);
}
pthread_exit(NULL);
};
threadpool_t *
threadpool_init(int max_threads, int max_workload)
{
threadpool_t *tp;
pthread_attr_t attr;
register int i=0;
int rc =0;
ASSERT(max_threads > 0 && max_workload > 0);
tp = malloc(sizeof(threadpool_t));
tp->max_threads = max_threads;
tp->threads = calloc(max_threads, sizeof(pthread_t));
tp->workqueue = q_init(max_workload);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_attr_setschedpolicy(&attr, SCHED_RR);
for (; i < max_threads; i++)
{
rc = pthread_create(&tp->threads[i], &attr, threadstart, tp);
/* worry about errors creating threads later :(*/
if (rc) printf("Error creating threadpool thread %d [%d]\r\n", i, rc);
}
pthread_attr_destroy(&attr);
return tp;
}
void
threadpool_q_workitem(threadpool_t *tp, action_f action, void *arg)
{
workitem_t *item;
ASSERT(tp != NULL);
item = malloc(sizeof(workitem_t));
item->action = action;
item->arg = arg;
q_enq(tp->workqueue, (void*)item);
};
编辑:队列功能
void q_enq(struct queue *q, void *data) {
struct timeval now;
struct timespec timeout;
pthread_mutex_lock(q->mut);
while (q->full) {
gettimeofday(&now, (struct timezone *)0);
timeout.tv_sec = now.tv_sec + Q_TIMEOUT;
timeout.tv_nsec = now.tv_usec * 1000;
pthread_cond_timedwait(q->notfull, q->mut, &timeout);
}
q->buffer[q->tail++] = data;
if (q->tail == q->num) {
q->tail = 0;
}
if (q->head == q->tail) {
q->full = 1;
}
q->empty = 0;
pthread_mutex_unlock(q->mut);
pthread_cond_signal(q->notempty);
}
void *q_dq(struct queue *q) {
void *data;
int rc;
struct timeval now;
struct timespec timeout;
pthread_mutex_lock(q->mut);
while (q->empty) {
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec + Q_TIMEOUT;
timeout.tv_nsec = now.tv_usec * 1000;
if (q->finished) {
pthread_mutex_unlock(q->mut);
return NULL;
}
rc = pthread_cond_timedwait(q->notempty, q->mut, &timeout);
if (q->finished) {
pthread_mutex_unlock(q->mut);
return NULL;
}
}
data = q->buffer[q->head++];
if (q->head == q->num) {
q->head = 0;
}
if (q->head == q->tail) {
q->empty = 1;
}
q->full = 0;
pthread_mutex_unlock(q->mut);
pthread_cond_signal(q->notfull);
return data;
}
请注意q_enq和q_dq函数,因为它必须具有锁定功能。哪一个是最可能的问题。 – sfossen 2009-02-24 15:28:27