2012-02-25 20 views
0

我有一个使用4个线程,以执行以下操作一个多线程的OpenCV程序:OpenCV的多线程(在Windows/.NET)延迟来自视频捕获几秒

螺纹1->调用cvQueryFrame()其从摄像机抓取的帧图像逐个并且将它们存储到std::vectorinputBuffer

螺纹2->上inputBuffer[0]进行阈值化,导致复制到另一个std::vector称为filterOutputBuffer

螺纹3->执行光流算法/吸引流在filterOutputBuffer前两个元素字段,副本导致到另一个std::vector称为ofOutputBuffer

螺纹4->显示使用cvShowImage(ofOutputBuffer[0])

所以基本上我被设想每个线程对应的第一个元素上执行该任务的图像输入向量/缓冲区,并将结果存储在相应输出向量的后面。像3名工厂工人一样在装配线上工作,然后将最终结果投入到下一个人的桶中。

我为所有缓冲区设置了互斥锁,程序工作正常,只有输出延迟了几秒钟。

我运行了一个非多线程版本的相同程序(使用一个巨大的while(true)循环),它只是偶尔出现口吃而实时运行。

为什么我的并发执行在性能上如此之慢?

下面是线程函数:

void writeBuffer() 
    { 
     cout << "Thread " << GetCurrentThreadId() << ": Capturing frame from camera!" << endl; 
     CvCapture *capture = 0; 
     IplImage *frame = 0; 
     DWORD waitResult; 

     if (!(capture = cvCaptureFromCAM(0))) 
      cout << "Cannot initialize camera!" << endl; 

     //now start grabbing frames and storing into the vector inputBuffer 
     while (true) 
     { 
      //cout << "Thread " << GetCurrentThreadId() << ": Waiting for mutex to write to input buffer!..." << endl; 
      waitResult = WaitForSingleObject(hMutex, INFINITE); 
      switch(waitResult) 
      { 
       // The thread got ownership of the mutex 
       case WAIT_OBJECT_0: 
        frame = cvQueryFrame(capture); //store the image into frame 
        if(!frame) 
        { 
         cout << "Thread " << GetCurrentThreadId() << ": Error capturing frame from camera!" << endl; 
        } 
        //cout << "Thread " << GetCurrentThreadId() << ": Getting Frame..." << endl; 
        inputBuffer.push_back(*frame); 
       break; 
       default: 
        cout << "Thread " << GetCurrentThreadId() << ": Error acquiring mutex..." << endl; 
      } 
      if(!ReleaseMutex(hMutex)) 
      { 
       cout << "Thread " << GetCurrentThreadId() << ": Error releasing mutex..." << endl; 
      } 
      //else cout << "Thread " << GetCurrentThreadId() << ": Done writing to input buffer, Mutex Released!" << endl; 
      //signal hDoneGettingFrame 
      PulseEvent(hDoneGettingFrame); 
     } 
      cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl; 
    } 


    void opticalFlow() 
    { 
    ... 
     DWORD waitResult; 

     //start grabbing frames from the vector inputBuffer 
     cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl; 
     while(true) 
     { 
      waitResult = WaitForSingleObject(fMutex, INFINITE); 
      switch(waitResult) 
      { 
       // The thread got ownership of the mutex 
       case WAIT_OBJECT_0: 
        //grab first two frames from buffer (inputBuffer[0-1]) and process them 
        if(filterOutputBuffer.size() > 1) 
        { 
         frame1 = filterOutputBuffer[0]; 
         frame2 = filterOutputBuffer[1]; 
         filterOutputBuffer.erase(filterOutputBuffer.begin()); 
        } 
        else 
        { 
         if(!ReleaseMutex(fMutex)) 
          cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl; 
         //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl; 
         continue; 
        } 
       break; 
       default: 
        cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl; 
        continue; 
      } 
      if(!ReleaseMutex(fMutex)) 
      { 
       cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
      } 
    ... 
    //Do optical flow stuff 
    ... 
    waitResult = WaitForSingleObject(oMutex, INFINITE); 
      switch(waitResult) 
      { 
       // The thread got ownership of the mutex 
       case WAIT_OBJECT_0: 
        //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl; 
        ofOutputBuffer.push_back(*frame1_3C); 
       break; 
       default: 
        cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl; 
      } 
      if(!ReleaseMutex(oMutex)) 
       cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl; 
    } 
     cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl; 
    } 

    void filterImage() 
{ 
    DWORD waitResult; 
... 

    //start grabbing frames from the vector inputBuffer 
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl; 
    while(true) 
    { 
     waitResult = WaitForSingleObject(hMutex, INFINITE); 
     switch(waitResult) 
     { 
      // The thread got ownership of the mutex 
      case WAIT_OBJECT_0: 
       //grab first frame and then release mutex 
       if(inputBuffer.size() > 0) 
       { 
        frame = inputBuffer[0]; 
        inputBuffer.erase(inputBuffer.begin()); 
       } 
       else 
       { 
        if(!ReleaseMutex(hMutex)) 
         cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
        //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl; 
        continue; 
       } 
      break; 
      default: 
       cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl; 
       continue; 
     } 
     if(!ReleaseMutex(hMutex)) 
     { 
      cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
     } 
... 
//Tresholding Image Stuff 
... 
     //cout << "Thread " << GetCurrentThreadId() << ": Waiting to write to output buffer..." << endl; 
     waitResult = WaitForSingleObject(fMutex, INFINITE); 
     switch(waitResult) 
     { 
      // The thread got ownership of the mutex 
      case WAIT_OBJECT_0: 
       //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl; 
       filterOutputBuffer.push_back(*out); 
      break; 
      default: 
       cout << "Thread " << GetCurrentThreadId() << ": Error acquiring filter mutex..." << endl; 
     } 
     if(!ReleaseMutex(fMutex)) 
      cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl; 

    } 
} 

void displayImage() 
{ 
    DWORD waitResult; 
    IplImage final; 
    int c; 
    cvNamedWindow("Image", CV_WINDOW_AUTOSIZE); 
    //start grabbing frames from the vector ouputBuffer 
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from output buffer..." << endl; 
    while (true) 
    { 
      waitResult = WaitForSingleObject(oMutex, INFINITE); 
      switch(waitResult) 
      { 
        // The thread got ownership of the mutex 
        case WAIT_OBJECT_0: 
         if(ofOutputBuffer.size() > 0) 
         { 
          //cout << "Thread " << GetCurrentThreadId() << ": Reading output buffer..." << endl; 
          final = ofOutputBuffer[0]; 
          ofOutputBuffer.erase(ofOutputBuffer.begin()); 
         } 
         else 
         { 
          if(!ReleaseMutex(oMutex)) 
           cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl; 
          //else cout << "Thread " << GetCurrentThreadId() << ": Output Buffer is empty!" << endl; 
          continue; 
         } 
        break; 
        default: 
         cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl; 
         continue; 
      } 
      if(!ReleaseMutex(oMutex)) 
       cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
      //else cout << "Thread " << GetCurrentThreadId() << ": Done reading output buffer, mutex Released!" << endl; 

      //cout << "Thread " << GetCurrentThreadId() << ": Displaying Image..." << endl; 
      cvShowImage("Image", &final); 
      c = cvWaitKey(1); 
    } 
    cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl; 
} 

这里的主要功能是:

void main() 
{ 
    hMutex = CreateMutex(NULL, FALSE, NULL); 
    oMutex = CreateMutex(NULL, FALSE, NULL); 
    fMutex = CreateMutex(NULL, FALSE, NULL); 

    hDoneGettingFrame = CreateEvent(NULL, TRUE, FALSE, NULL); 
    hDoneReadingFrame = CreateEvent(NULL, TRUE, FALSE, NULL); 

    TName[0]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)writeBuffer, NULL, 0, &ThreadID); 
    TName[1]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)filterImage, NULL, 0, &ThreadID); 
    TName[2]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)opticalFlow, NULL, 0, &ThreadID); 
    TName[3]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)displayImage, NULL, 0, &ThreadID); 
    WaitForMultipleObjects(4, TName, TRUE, INFINITE); 
    CloseHandle(TName); 
} 

回答

0

信号量做到了!我不是使用单独的互斥体,而是创建了一个信号量,让所有线程都通过它。

谢谢,它现在运行的很快,很顺利!

void main() 
{ 
    hSemaphore = CreateSemaphore( 
     NULL,   // default security attributes 
     MAX_THREADS, // available count (when a thread enters, it decreases) 
     MAX_THREADS, // maximum count 
     NULL);   // unnamed semaphore 

    if (hSemaphore == NULL) 
    { 
     printf("CreateSemaphore error: %d\n", GetLastError()); 
     return; 
    } 


    TName[0]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)writeBuffer, NULL, 0, &ThreadID); 
    TName[2]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)opticalFlow, NULL, 0, &ThreadID); 
    TName[3]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)displayImage, NULL, 0, &ThreadID); 
    WaitForMultipleObjects(4, TName, TRUE, INFINITE); 
    CloseHandle(TName); 
} 

而在线程...

//instead of separate Mutexes, just wait for semaphore 
    waitResult = WaitForSingleObject(hSemaphore, INFINITE); 
       switch(waitResult) 
       { 

          ... 

          } 
       if(!ReleaseSemaphore(hSemaphore, 1, NULL)) 
        cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
+0

另外,我组合filterImage线程到opticalFlow线程 – Josh 2012-02-29 02:05:47

+0

不知道你做了什么,在这里。每个线程间队列都需要自己的信号量来计数队列项和自己的互斥量,以保护队列不受多个访问的影响。如果有5个线程,它们之间有4个阶段,则需要4个队列,4个信号量和4个互斥量。 – 2012-03-02 08:54:35

+0

我创建了一个信号量,并且所有线程都进入工作状态,然后离开。如果我必须使用互斥体,信号量的目的究竟是什么? – Josh 2012-03-09 02:48:55

0

多线程应用程序共享线程之间的CPU时间。因此,当另一个线程想要处于运行状态时存在上下文切换。可能在线程之间切换会增加CPU时间,导致应用程序变慢。

0

尝试使用threadPool可以最大限度地减少CPU在线程之间传输的时间。

+0

也许,如果我移动阈值和光流到同一个线程将最大限度地减少上下文切换? – Josh 2012-02-25 21:09:58

0

好了,下手,如果我摇你的第一个线程函数的有点绕环路:

if(!ReleaseMutex(hMutex)){} 
PulseEvent(hDoneGettingFrame); 
waitResult = WaitForSingleObject(hMutex, INFINITE); 

换一种方式,你的第一个线程持有到队列互斥几乎整个运行的第一个线程循环,所以阻止第二个线程到达任何地方。我猜所有其他线程的代码都是一样的吗?

当在一个流水线中推送生产者 - 消费者队列中的数据时,这个想法是你应该保持队列锁定的时间最短。在缓冲区对象上进行处理,然后锁定队列,按下对象引用,然后立即解锁队列。然后发出信号(或类似信号),以便下一个线程可以处理该对象。

不要保持队列锁定!该互斥体不应该存在于其他线程等待工作 - 这是为了保护队列不受多个访问的影响。您需要一些其他信号来维护队列计数和线程等待工作。不要为此使用事件,无论您在网上看到多少其他示例 - 如果必须推出自己的生产者 - 消费者队列,请使用信号量。

更好 - 使用已经工作的P-C队列类 - 查看BlockingCollections类。