2017-06-07 78 views
2

我有多个读取线程和一个写入线程。如果我在其中一个读线程上锁定了互斥体并从中发送了广播,是否可以保证互斥体将被等待在pthread_cond_wait()上的写入线程锁定,或者是否存在另一个读取线程在pthread_mutex_lock()上隐藏的可能性互斥?主要问题是pthread_cond_wait()的优先级高于pthread_mutex_lock()吗?pthread_cond_wait和pthread_mutex_lock优先级?

如果不是,我该如何实现互斥锁将始终被pthread_cond_broadcast()上的写入线程锁定?

读主题:

pthread_mutex_lock(mutex); 
pthread_cond_broadcast(cond); 
pthread_mutex_unlock(mutex); 

编写线程:

pthread_mutex_lock(&mutex); 
pthread_cond_wait(&cond, &mutex); 

回答

0

假设两个线程,读取和写入,在同一时刻到达pthread_mutex_lock。所以,要么写线程调用pthread_mutex_lock获得互斥量,要么读取线程。

如果它是写入线程,读取的将等待pthread_mutex_lock。写,通过呼吁pthread_cond_wait版本mutex和块上cond。它是以原子方式完成的。所以,当读取线程为mutex时,我们可以确定读取的线程在cond上等待。因此,广播cond到达写线程,它不再等待cond,但 - 仍在pthread_cond_wait范围 - 试图获得锁定mutex(保持为读线程)。读取线程在广播cond后释放mutex并转到写入线程。所以写线程终于从pthread_cond_wait退出,锁定了mutex。请记住稍后解锁它。

如果它是读取线程,写入将等待pthread_mutex_lock,读取将在cond上广播信号,然后释放mutex。之后,写线程获取pthread_mutex_lock上的mutex并立即释放它pthread_cond_wait等待cond(请注意,之前的cond广播对当前的pthread_cond_wait没有影响)。在读取线程的下一次迭代中,它获取锁定mutex,发送广播cond并解锁mutex。这意味着写入线程在cond上向前移动,并获取mutex上的锁定。

它是否回答您关于优先级的问题?


更新后评论。

我们假设我们有一个线程(让我们将其命名为A以供将来参考)持有锁mutex和其他几个尝试获取相同的锁。只要第一个线程释放了锁,就无法预测哪个线程会获得锁。此外,如果A线程有一个循环并尝试重新获取锁定mutex,则有可能会被授予此锁定,并且其他线程会一直等待。添加pthread_cond_wait不会更改授予锁定范围内的任何内容。

让我引用POSIX规范的片段(见https://stackoverflow.com/a/9625267/2989411供参考):

这些功能原子方式释放互斥和导致调用线程阻塞条件变量COND;原子地这里的意思是“原子上关于另一个线程访问互斥体,然后是条件变量”。也就是说,如果另一个线程能够在约程序线程释放它之后获取该互斥体,则该线程中对pthread_cond_broadcast()或pthread_cond_signal()的后续调用将表现得好像它是在about-阻塞线程被阻塞。

这只是标准中关于操作顺序的保证。为其他线程授予锁定的顺序是相当难以预测的,并且根据一些非常微妙的时间波动而变化。

仅用于互斥体相关的代码,请用一下下面的代码:

#define _GNU_SOURCE 
#include <pthread.h> 

#include <stdio.h> 
#include <unistd.h> 

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 

void *th(void *arg) { 
    int i; 
    char *s = arg; 
    for (i = 0; i < 10; ++i) { 
     pthread_mutex_lock(&mutex); 
     printf("%s %d\n", s, i); 
     //sleep(1); 
     pthread_mutex_unlock(&mutex); 
#if 0 
     pthread_yield(); 
#endif 
    } 
    return NULL; 
} 

int main() { 
    int i; 
    for (i = 0; i < 10; ++i) { 
     pthread_t t1, t2, t3; 
     printf("================================\n"); 
     pthread_create(&t1, NULL, th, "t1"); 
     pthread_create(&t2, NULL, th, "  t2"); 
     pthread_create(&t3, NULL, th, "   t3"); 
     pthread_join(t1, NULL); 
     pthread_join(t2, NULL); 
     pthread_join(t3, NULL); 
    } 
    return 0; 
} 

在一体机(单CPU),它总是显示从T3,T2则整个循环最后从T1。在另一个(2个内核)线程的顺序更随机,但几乎总是在为其他线程授予互斥之前,为每个线程显示整个循环。很少有一种情况,如:

t1 8 
t1 9 
      t3 0 
    t2 0 
    t2 1 
    [removed other t2 output] 
    t2 8 
    t2 9 
      t3 1 
      t3 2 

#if 1更换#if 0启用pthread_yield,看结果,是否有输出。对我而言,它的工作方式是两个线程交错显示输出,然后第三个线程终于有机会工作。添加另一个或多个线程。用睡觉玩等,它证实了随机行为。

如果您希望稍微做一点实验,请编译并运行以下代码片段。这是一个单一生产者的例子 - 多个消费者模型。它可以用两个参数运行:第一个是消费者线程的数量,第二个是生成的数据序列的长度。如果没有给出参数,则有一个消费者线程和120个要处理的项目。我还建议睡眠/ usleep的地方标明/* play here */:改变参数的值,在全部除去睡眠,移动它 - 在适当的时候 - 到临界区或pthread_yield更换和观察行为的变化。

#define _GNU_SOURCE 
#include <assert.h> 
#include <limits.h> 
#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <time.h> 
#include <unistd.h> 

struct data_t { 
    int seq; 
    int payload; 
    struct data_t *next; 
}; 

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 
struct data_t *first = NULL, *last = NULL; 
int in_progress = 1; 
int num_data = 120; 

void push(int seq, int payload) { 
    struct data_t *e; 
    e = malloc(sizeof(struct data_t)); 
    e->seq = seq; 
    e->payload = payload; 
    e->next = NULL; 
    if (last == NULL) { 
     assert(first == NULL); 
     first = last = e; 
    } else { 
     last->next = e; 
     last = e; 
    } 
} 

struct data_t pop() { 
    struct data_t res = {0}; 
    if (first == NULL) { 
     res.seq = -1; 
    } else { 
     res.seq = first->seq; 
     res.payload = first->payload; 
     first = first->next; 
     if (first == NULL) { 
      last = NULL; 
     } 
    } 
    return res; 
} 

void *producer(void *arg __attribute__((unused))) { 
    int i; 
    printf("producer created\n"); 
    for (i = 0; i < num_data; ++i) { 
     int val; 
     sleep(1); /* play here */ 
     pthread_mutex_lock(&mutex); 
     val = rand()/(INT_MAX/1000); 
     push(i, val); 
     pthread_mutex_unlock(&mutex); 
     pthread_cond_signal(&cond); 
     printf("prod %3d %3d signaled\n", i, val); 
    } 
    in_progress = 0; 
    printf("prod end\n"); 
    pthread_cond_broadcast(&cond); 
    printf("prod end signaled\n"); 
    return NULL; 
} 

void *consumer(void *arg) { 
    char c_id[1024]; 
    int t_id = *(int *)arg; 
    sprintf(c_id, "%*s c %02d", t_id % 10, "", t_id); 
    printf("%s created\n", c_id); 
    while (1) { 
     struct data_t item; 
     pthread_mutex_lock(&mutex); 
     item = pop(); 
     while (item.seq == -1 && in_progress) { 
      printf("%s waits for data\n", c_id); 
      pthread_cond_wait(&cond, &mutex); 
      printf("%s got signal\n", c_id); 
      item = pop(); 
     } 
     if (!in_progress && item.seq == -1) { 
      printf("%s detected end of data.\n", c_id); 
      pthread_mutex_unlock(&mutex); 
      break; 
     } 
     pthread_mutex_unlock(&mutex); 
     printf("%s processing %3d %3d\n", c_id, item.seq, item.payload); 
     sleep(item.payload % 10); /* play here */ 
     printf("%s processed %3d %3d\n", c_id, item.seq, item.payload); 
    } 
    printf("%s end\n", c_id); 
    return NULL; 
} 

int main(int argc, char *argv[]) { 
    int num_cons = 1; 
    pthread_t t_prod; 
    pthread_t *t_cons; 
    int i; 
    int *nums; 
    if (argc > 1) { 
     num_cons = atoi(argv[1]); 
     if (num_cons == 0) { 
      num_cons = 1; 
     } 
     if (num_cons > 99) { 
      num_cons = 99; 
     } 
    } 
    if (argc > 2) { 
     num_data = atoi(argv[2]); 
     if (num_data < 10) { 
      num_data = 10; 
     } 
     if (num_data > 600) { 
      num_data = 600; 
     } 
    } 

    printf("Spawning %d consumer%s for %d items.\n", num_cons, num_cons == 1 ? "" : "s", num_data); 
    t_cons = malloc(sizeof(pthread_t) * num_cons); 
    nums = malloc(sizeof(int) * num_cons); 
    if (!t_cons || !nums) { 
     printf("Out of memory!\n"); 
     exit(1); 
    } 
    srand(time(NULL)); 
    pthread_create(&t_prod, NULL, producer, NULL); 

    for (i = 0; i < num_cons; ++i) { 
     nums[i] = i + 1; 
     usleep(100000); /* play here */ 
     pthread_create(t_cons + i, NULL, consumer, nums + i); 
    } 

    pthread_join(t_prod, NULL); 

    for (i = 0; i < num_cons; ++i) { 
     pthread_join(t_cons[i], NULL); 
    } 
    free(nums); 
    free(t_cons); 

    return 0; 
} 

我希望我已经清除你的疑虑,并给你一些代码进行试验,并获得有关并行线程行为的一些信心。

+0

感谢您的回答。无论如何,我不知道它绝对回答了我的问题。如果一个写线程正在等待pthread_cond_wait()并且我们有两个读线程,会发生什么情况。一个读线程设法锁定互斥并发送广播。释放此互斥锁后,是否有任何保证写入线程会在等待广播时锁定互斥锁(另一个读取线程正在等待同时锁定互斥锁)? – testtest1235

+0

@ testtest1235我已经用一些附加信息和示例扩展了我的答案。 – ArturFH