2009-01-10 72 views
2

使用POSIX线程& C++,我有一个“插入操作”,它只能一次安全地完成。pthread_join - 多线程等待

如果我有多个线程在等待使用pthread_join插入,然后在完成时产生一个新线程 。它们是否都会一次收到“线程完成”信号并产生多个插入,或者可以安全地假设首先收到“线程完成”信号的线程会产生一个阻塞其他线程的新线程来创建新线程。

/* --- GLOBAL --- */ 
pthread_t insertThread; 



/* --- DIFFERENT THREADS --- */ 
// Wait for Current insert to finish 
pthread_join(insertThread, NULL); 

// Done start a new one 
pthread_create(&insertThread, NULL, Insert, Data); 

感谢您的答复

程序基本上是一个巨大的哈希表这需要通过套接字来自客户端的请求。

每个新的客户端连接都会产生一个新线程,然后可以执行多个操作,特别是查找或插入。查询可以并行进行。但插入需要“重新组合”成一个单独的线程。你可以说查找操作可以在不产生客户端的新线程的情况下完成,但是它们可能需要一段时间才会导致服务器锁定,从而丢失新的请求。该设计试图尽可能减少系统调用和线程创建。

但现在,我知道这是不是安全的我第一次觉得我应该能凑齐东西一起

感谢

回答

3

opengroup.org on pthread_join

的多个同时呼叫的结果在pthread_join()指定相同的目标线程是不确定的。

所以,你真的不应该有几个线程加入你以前的insertThread。

首先,当您使用C++时,我推荐使用boost.thread。它们类似于POSIX线程模型,也可以在Windows上工作。它可以帮助您使用C++,即通过使功能对象更容易使用。

其次,为什么你要开始一个插入元素的新线程,当你在开始下一个元素之前必须等待前一个元素完成?似乎不是经典的多线程使用。

虽然...一个典型的解决方案是让一个工作线程从事件队列中获取作业,并将其他线程发布到事件队列中。

如果你真的只是想继续或多或少你现在的样子,你不得不这样做:

  • 创建一个条件变量,像insert_finished
  • 想要做插入的所有线程,等待条件变量。
  • 只要一个线程完成插入操作,它就会触发条件变量。
  • 由于条件变量需要一个互斥锁,你可以通知所有的等待线程,它们都希望开始插入,但是因为一次只有一个线程可以获取互斥锁,所有线程都会按顺序执行插入操作。

但是您应该注意,您的同步不是以太特殊的方式实现的。由于这被称为insert,我怀疑你想操作一个数据结构,所以你可能想要首先实现一个线程安全的数据结构,而不是共享数据结构访问和所有客户端之间的同步。我也怀疑会有更多的操作,然后只是insert,这将需要适当的同步...

2

根据单一Unix Specifcation的道:“多个同时执行的结果,在pthread_join ()指定相同的目标线程未定义。“实现单线程来获取任务的“正常方式”是设置一个条件变量(不要忘记相关的互斥量):空闲线程在pthread_cond_wait()(或pthread_cond_timedwait())中等待,当完成该工作的线程完成时,它会使用pthread_cond_signal()唤醒其中一个空闲线程。

0

其他人已经指出这有未定义的行为。我只是补充说,完成任务的最简单方法(只允许一个线程执行部分代码)是使用一个简单的互斥体 - 您需要执行该代码的线程是MUTally EXclusive,这就是互斥体到达的地方它的名字:-)

如果你需要的代码到一个特定的线程(如Java AWT)来运行,那么你需要条件变量。但是,您应该认真考虑这个解决方案是否真正有效。想象一下,如果您每秒调用一次“插入操作”10000次,您需要多少上下文切换。

0

正如你刚才提到你正在使用的哈希表与平行于插入几个看起坐,我建议你检查你是否可以使用并行哈希表。

由于精确的查找结果是不确定的,当你同时插入的元素,例如并发哈希地图可能正是你需要的。但是,我并没有在C++中使用并发散列表,但是因为它们都可以在Java中使用,所以您肯定会找到一个在C++中执行此操作的库。

0

,我发现它支持插入不锁定新查找唯一库 - Sunrise DD(我不知道它是否支持并发插入)

但是从谷歌的Sparse Hash map超过开关一倍的内存使用情况。查找应该很少发生,所以而不是试图写我自己的库 ,它结合了两者的优点,我宁愿只是在安全更改时锁定表暂停查找。

再次感谢

0

在我看来,你想序列化插入到散列表。

为此,您需要一个锁定 - 不产生新的线程。

0

从您的描述,看起来非常低效的,因为你正在重新创建要插入的东西,每次插入线程。创建线程的成本不是0。

这个问题的一个更常见的解决方案是产生一个等待队列的插入线程(即,在循环为空时位于循环休眠中)。其他线程然后将工作项添加到队列中。插入线程按照它们被添加的顺序(或者如果你想要的话,按优先级)选择队列中的项目并执行相应的操作。

你所要做的就是确保加入队列是受到保护的,这样一次只有一个线程可以访问修改实际队列,并且插入线程不会忙于等待,在队列中(请参阅条件变量)。

1

是的,因为大多数人推荐最好的方式似乎有一个工作线程从队列中读取。一些代码片段低于

pthread_t  insertThread = NULL; 
    pthread_mutex_t insertConditionNewMutex = PTHREAD_MUTEX_INITIALIZER; 
    pthread_mutex_t insertConditionDoneMutex = PTHREAD_MUTEX_INITIALIZER; 
    pthread_cond_t insertConditionNew  = PTHREAD_COND_INITIALIZER; 
    pthread_cond_t insertConditionDone  = PTHREAD_COND_INITIALIZER; 

     //Thread for new incoming connection 
     void * newBatchInsert() 
     { 
      for(each Word) 
      { 
          //Push It into the queue 
          pthread_mutex_lock(&lexicon[newPendingWord->length - 1]->insertQueueMutex); 
           lexicon[newPendingWord->length - 1]->insertQueue.push(newPendingWord); 
          pthread_mutex_unlock(&lexicon[newPendingWord->length - 1]->insertQueueMutex); 

      } 

        //Send signal to worker Thread 
        pthread_mutex_lock(&insertConditionNewMutex); 
         pthread_cond_signal(&insertConditionNew); 
        pthread_mutex_unlock(&insertConditionNewMutex); 

        //Wait Until it's finished 
        pthread_cond_wait(&insertConditionDone, &insertConditionDoneMutex); 

     } 


      //Worker thread 
      void * insertWorker(void *) 
      { 

       while(1)   
       { 

        pthread_cond_wait(&insertConditionNew, &insertConditionNewMutex); 

        for (int ii = 0; ii < maxWordLength; ++ii) 
        {     
          while (!lexicon[ii]->insertQueue.empty()) 
          { 

           queueNode * newPendingWord = lexicon[ii]->insertQueue.front(); 


           lexicon[ii]->insert(newPendingWord->word); 

           pthread_mutex_lock(&lexicon[ii]->insertQueueMutex); 
           lexicon[ii]->insertQueue.pop(); 
           pthread_mutex_unlock(&lexicon[ii]->insertQueueMutex); 

          } 

        } 

        //Send signal that it's done 
        pthread_mutex_lock(&insertConditionDoneMutex); 
         pthread_cond_broadcast(&insertConditionDone); 
        pthread_mutex_unlock(&insertConditionDoneMutex); 

       } 

      } 

      int main(int argc, char * const argv[]) 
      { 

       pthread_create(&insertThread, NULL, &insertWorker, NULL); 


       lexiconServer = new server(serverPort, (void *) newBatchInsert); 

       return 0; 
      } 
0

理想情况下,即使它们执行不同的操作,也不希望多个线程池在单个进程中。线程的可恢复性是一个重要的体系结构定义,如果你使用C,这将导致在主线程中创建pthread_join。当然,对于C++线程池又称为ThreadFactory,其思想是保持线程原语抽象,它可以处理传递给它的任何函数/操作类型。

一个典型的例子是一个网络服务器,它具有连接池和线程池,服务连接并进一步处理它们,但都是从一个公共的线程池进程派生的。

摘要:避免PTHREAD_JOIN在主线程以外的任何地方。