2016-07-07 100 views
0

因此,我正在写一种示波器式的程序,它读取计算机上的串行端口并对该数据执行fft以将其转换为频谱。我遇到了一个问题,虽然我的程序的布局分解成SerialHandler类(利用boost::Asio),FFTHandler类和main函数。 SerialHandler类使用boost::Asio`` async_read_some函数从端口读取并引发一个名为HandleOnPortReceive的事件,然后它自己读取数据。C++信号量混乱?

问题是我找不到将事件处理程序中的数据从另一个线程上的io_service对象传递到另一个线程上的FFTHandler类的方法。我被推荐使用信号量来解决我的问题,但是我对semaphore.h的用法几乎一无所知,所以我的实现现在相当破碎,并且没有做任何事情。

下面是一些代码,如果这使得它更清晰一点:

using namespace Foo; 
//main function  
int main(void){ 
    SerialHandler serialHandler; 
    FFTHandler fftHandler; 

    sem_t *qSem_ptr = &qSem; 

    sem_init(qSem_ptr, 1, 0); 

    //create separate threads for both the io_service and the AppendIn so that neither will block the user input statement following 
    serialHandler.StartConnection(tempInt, tempString); //these args are defined, but for brevity's sake, I ommitted the declaration 

    t2= new boost::thread(boost::bind(&FFTHandler::AppendIn, &fftHandler, q, qSem)); 

    //allow the user to stop the program and avoid the problem of an infinite loop blocking the program 
    char inChar = getchar(); 
    if (inChar) {...some logic to stop reading} 
} 




namespace Foo{ 

    boost::thread *t1; 
    boost::thread *t2; 
    sem_t qSem; 
    std::queue<double> q; 
    boost::mutex mutex_; 

    class SerialHandler{ 
    private: 
     char *rawBuffer; //array to hold incoming data 
     boost::asio::io_service ioService; 
     boost::asio::serial_port_ptr serialPort; 
    public: 

      void SerialHandler::StartConnection(int _baudRate, string _comPort){ 
       //some functionality to open the port that is irrelevant to the question goes here 

       AsyncReadSome(); //starts the read loop 

       //create thread for io_service object and let function go out of scope 
       t1 = new boost::thread(boost::bind(&boost::asio::io_service::run, &ioService)); 

      } 

      void SerialHandler::AsyncReadSome(){ 

       //there's some other stuff here for error_catching, but this is the only important part 
       serialPort->async_read_some (
          boost::asio::buffer(rawBuffer, SERIAL_PORT_READ_BUF_SIZE), 
          boost::bind(
            &SerialHandler::HandlePortOnReceive, 
            this, boost::asio::placeholders::error, 
            boost::asio::placeholders::bytes_transferred, q)); 
      } 

      void SerialHandler::HandlePortOnReceive(const boost::system::error_code& error, size_t bytes_transferred, std::queue<double>& q){ 
       boost::mutex::scoped_lock lock(mutex_); 
       //more error checking goes here, but I've made sure they aren't returning and are not the issue 

       for (unsigned int i =0; i<bytes_transferred; i++){ 
        unsigned char c = rawBuffer[i]; 
        double d = (double) c; //loop through buffer and read 
        if (c==endOfLineChar){ 
        } else //if not delimiting char, push into queue and post semaphore 
        { 
          q.push(d); 
          //cout << d << endl; 
          sem_post(&qSem); 
          cout << q.front() << endl; 
          cout << "size is: " << q.size() << endl; 
        } 
       } 
       //loop back on itself and start the next read 
       AsyncReadSome(); 

      } 
    } 

    class FFTHandler{ 
    private: 
     double *in; //array to hold inputs 
     fftw_complex *out; //holds outputs 
     int currentIndex; 
     bool filled; 
     const int N; 
    public: 


     void AppendIn(std::queue<double> &q, sem_t &qSem){ 
       while(1){ //this is supposed to stop thread from exiting and going out of scope...it doesn't do that at all effectively... 
        cout << "test" << endl; 
        sem_wait(&_qSem); //wait for data...this is blocking but I don't know why 
        double d = _q.front(); 
        _q.pop(); 
        in[currentIndex]=d; //read queue, pop, then append in array 
        currentIndex++; 
        if (currentIndex == N){ //run FFT if full and reset index 
          currentIndex = N-overlap-1; 
          filled = true; 
          RunFFT(); 
        } 
       } 

     } 
    } 

} 

,在FFTHandler::AppendIn(..)调试行确实是射击,所以正在创建线程,但它immediateley走出去的范围似乎并破坏了线程,因为它似乎已经设置了对信号量进行错误响应的时间。

TLDR:这是一个很长的解释简单地说,“我不明白,但信号灯需要以某种方式实现这些我努力了,失败了,所以现在我来这里希望得到这个代码的帮助。从别人比我更了解

更新:所以一些调试语句玩耍后,似乎问题是while(1){...}声明确实是射击,但该sem_wait(&_qSem);导致它块什么的。因为它无限期地等待着,尽管信号量已经被发布,但它仍然在等待,并且永远不会超出该线。

+1

我看不出任何具体的_semaphore usage_错误,但是我可以看到线程潜在的问题 - 在线程完成之前'main'退出,例如? (从你刚刚说的代码中不清楚“...一些逻辑停止阅读”) –

+2

为什么你有Seri​​alHandler和FFTHandler运行在不同的线程?如果意图是将工作传递给另一个线程,以便SerialHandler可以接收下一组数据,那么您可能需要考虑Leader/Followers模式,并有一个运行io_service的线程池。在这里,每个线程都将使用SerialHandler读取数据并使用FFTHandler处理数据,一旦完成,将返回到池中以等待下一个io事件。 – aichao

+0

@aichao我有Seri​​alHandler io_service在一个单独的线程上运行,因为它在处理程序中循环回去,因此它创建了一个无限循环。但是,我希望能够停止程序,不可阻挡的无限循环并不是一个理想的功能,所以我将它移动到另一个线程,以便能够接受用户输入以最终在用户感觉喜欢时终止该循环。 'FFTHandler :: AppendIn(...)'函数在它自己的线程中是出于类似的原因,不阻塞主线程并释放用户在程序运行时能够做的事情(即停止程序) 。 – Scorch

回答

2

既然您已经在使用boost::mutex及其范围锁定类型,我建议您使用boost::condition_variable而不是POSIX信号量。否则,你将C++ 11风格的同步与POSIX同步混合在一起。

添加到队列时锁定互斥锁,但我没有看到锁定互斥锁以从队列中读取的任何内容。它也看起来像循环回调到AsyncReadSome而互斥锁仍处于锁定状态。

选择单一形式的同步,然后正确使用它。

+0

感谢您的建议,而这似乎清除了一些混淆。我拉动信号量,并根据你的想法使用boost :: condition_variable,这似乎有效。虽然它引入了另一个问题(分段错误),但据我所知,这与信令和条件变量无关。 – Scorch

1

信号量的初始值为0,对此情况有效。所以它需要一个sem_post为FFTHandler::AppendIn()被解除封锁。但是我没有看到第一次调用SerialHandler::AsyncReadSome()的代码来读取串行端口并将其发送到队列中。如果你修复了那部分代码,我认为sem_post会发生并且FFTHandler线程会运行。作为第一步,您可以在sem_wait和AsyncReadSome()函数内打印一个调试打印,而我的猜测是两者都不会被执行。

所以,基本上你会希望确保'阅读'被启动并作为主线程或不同线程的一部分保持活动状态。

+0

我很抱歉,那是我的错,我意识到我在复制代码时犯了一个错误。在创建新线程t1之前,我正在调用AsyncReadSome()。我会更新上面的代码。但是,为了以防万一,我确实创建了一些调试行,并且这些行正在触发,因此它正在执行AsyncReadSome()。这就是说,在玩弄你的建议的同时,我发现了一些与信号量有关的有趣事情,所以我也会将其添加到该问题中。 – Scorch

+0

您在更新中提出的观点正是我的意思:)感谢您保持发布。 – Gops