2013-03-08 14 views
0

我正在使用boost库来开发异步udp通信。在接收端收到的数据正由另一个线程进行处理。那么我的问题是,当我在另一个线程而不是接收线程中读取接收到的数据时,它会给出一个修改的数据或更新的数据,而这些数据不是应该是的数据。 我的代码正在处理发件人端和接收端的无符号字符缓冲区数组。原因是我需要考虑无符号字符缓冲器作为数据的分组 例如缓冲液[2] = Engine_start_ID如何同步多线程程序中正在处理的数据?

/* global buffer to store the incomming data 
    unsigned char received_buffer[200]; 
    /* 
    global buffer accessed by another thread 
     which contains copy the received_buffer 
    */ 
    unsigned char read_hmi_buffer[200]; 
    boost::mutex hmi_buffer_copy_mutex; 


     void udpComm::start_async_receive() { 
      udp_socket.async_receive_from(
      boost::asio::buffer(received_buffer, max_length), remote_endpoint, 
      boost::bind(&udpComm::handle_receive_from, this, 
      boost::asio::placeholders::error, 
      boost::asio::placeholders::bytes_transferred)); 
     } 
    /* the data received is stored in the unsigned char received_buffer data buffer*/ 

    void udpComm::handle_receive_from(const boost::system::error_code& error, 
     size_t bytes_recvd) { 
     if (!error && bytes_recvd > 0) { 
      received_bytes = bytes_recvd; 
        hmi_buffer_copy_mutex.lock(); 
      memcpy(&read_hmi_buffer[0], &received_buffer[0], received_bytes); 
        hmi_buffer_copy_mutex.unlock(); 

        /*data received here is correct 'cus i printed in the console 
         checked it 
        */ 
        cout<<(int)read_hmi_buffer[2]<<endl; 
     } 

      start_async_receive(); 

     } 
     /* io_service is running in a thread 
     */ 
     void udpComm::run_io_service() { 
     udp_io_service.run(); 
     usleep(1000000); 
     } 

上面的代码运行的线程

/*我的第二线程功能的异步UDP通信是*/

 void thread_write_to_datalink() 
     { hmi_buffer_copy_mutex.lock(); 
      /* here is my problem begins*/ 
       cout<<(int)read_hmi_buffer[2]<<endl; 
      hmi_buffer_copy_mutex.unlock(); 

      /* all data are already changed */ 

       serial.write_to_serial(read_hmi_buffer, 6); 

     } 
     /* threads from my main function 
      are as below */ 


      int main() { 
       receive_from_hmi.start_async_receive(); 
       boost::thread thread_receive_from_hmi(&udpComm::run_io_service, 
       &receive_from_hmi); 
       boost::thread thread_serial(&thread_write_to_datalink); 
       thread_serial.join(); 
       thread_receive_from_hmi.join(); 
       return 0; 
     } 

/*该Serial_manager类包含函数书面方式和从串行端口读*/

#include <iostream> 
      #include <boost/thread.hpp> 
      #include <boost/asio.hpp> 
      #include <boost/date_time/posix_time/posix_time.hpp> 


      using namespace boost::asio; 

      class Serial_manager { 
      public: 

       Serial_manager(boost::asio::io_service &serial_io_service,char *dev_name); 
       void open_serial_port(); 
       void write_to_serial(void *data, int size); 
       size_t read_from_serial(void *data, int size); 
       void handle_serial_exception(std::exception &ex); 
       virtual ~Serial_manager(); 
       void setDeviceName(char* deviceName); 

      protected: 
       io_service &port_io_service; 
       serial_port datalink_serial_port; 
       bool serial_port_open; 
       char *device_name; 

      }; 

      void Serial_manager::setDeviceName(char* deviceName) { 
       device_name = deviceName; 
      } 
      Serial_manager::Serial_manager(boost::asio::io_service &serial_io_service,char *dev_name): 
       port_io_service(serial_io_service), 
       datalink_serial_port(serial_io_service) { 
       device_name = dev_name; 
       serial_port_open = false; 
       open_serial_port(); 
      } 

      void Serial_manager::open_serial_port() { 
       bool temp_port_status = false; 
       bool serial_port_msg_printed = false; 
       do { 
        try { 
         datalink_serial_port.open(device_name); 
         temp_port_status = true; 
        } catch (std::exception &ex) { 
         if (!serial_port_msg_printed) { 
          std::cout << "Exception-check the serial port device " 
            << ex.what() << std::endl; 
          serial_port_msg_printed = true; 
         } 
         datalink_serial_port.close(); 
         temp_port_status = false; 
        } 

       } while (!temp_port_status); 
       serial_port_open = temp_port_status; 
       std::cout <<std::endl <<"serial port device opened successfully"<<std::endl; 
       datalink_serial_port.set_option(serial_port_base::baud_rate(115200)); 
       datalink_serial_port.set_option(
         serial_port_base::flow_control(
           serial_port_base::flow_control::none)); 
       datalink_serial_port.set_option(
         serial_port_base::parity(serial_port_base::parity::none)); 
       datalink_serial_port.set_option(
         serial_port_base::stop_bits(serial_port_base::stop_bits::one)); 
       datalink_serial_port.set_option(serial_port_base::character_size(8)); 

      } 

      void Serial_manager::write_to_serial(void *data, int size) { 
       boost::asio::write(datalink_serial_port, boost::asio::buffer(data, size)); 
      } 

      size_t Serial_manager::read_from_serial(void *data, int size) { 
       return boost::asio::read(datalink_serial_port, boost::asio::buffer(data, size)); 
      } 
      void Serial_manager::handle_serial_exception(std::exception& ex) { 
       std::cout << "Exception-- " << ex.what() << std::endl; 
       std::cout << "Cannot access data-link, check the serial connection" 
         << std::endl; 
       datalink_serial_port.close(); 
       open_serial_port(); 
      } 

      Serial_manager::~Serial_manager() { 
       // TODO Auto-generated destructor stub 
      } 

我认为我的问题领域是关于线程同步和通知,如果你帮我,我会很高兴。您不应该担心发件人完美地工作,因为我已经检查过它的数据是在接收者线程收到的。我希望你能理解我的问题。

编辑:这里是modification.My整体思路这里是发展因此按照我的设计,我有通过 UDP通信发送命令的客户端应用程序的手动飞行控制仿真。在接收方打算使用3个线程。一个线程接收输入从枝即空隙start_hotas()第二线程是从发送方(客户端)接收命令线程:空隙udpComm :: run_io_service()和第三是空隙thread_write_to_datalink()

   /* a thread that listens for input from sticks*/ 
      void start_hotas() { 
     Hotas_manager hotasobj; 
     __s16 event_value; /* value */ 
     __u8 event_number; /* axis/button number */ 
     while (1) { 
      hotasobj.readData_from_hotas(); 


      event_number = hotasobj.getJoystickEvent().number; 
      event_value = hotasobj.getJoystickEvent().value; 
      if (hotasobj.isAxisPressed()) { 
       if (event_number == 0) { 
        aileron = (float) event_value/32767; 

       } else if (event_number == 1) { 
        elevator = -(float) event_value/32767; 

       } else if (event_number == 2) { 
        rudder = (float) event_value/32767; 

       } else if (event_number == 3) { 
        brake_left = (float) (32767 - event_value)/ 65534; 


       } else if (event_number == 4) { 


       } else if (event_number == 6) { 


       } else if (event_number == 10) { 


       } else if (event_number == 11) { 

       } else if (event_number == 12) { 



       } 
      } else if (hotasobj.isButtonPressed()) { 



      } 
      usleep(1000); 

     } 

    } 


      /* 
    * Hotas.h 
    * 
    * Created on: Jan 31, 2013 
    *  Author: metec 
    */ 

    #define JOY_DEV "/dev/input/js0" 
    #include <iostream> 
    #include <boost/thread.hpp> 
    #include <boost/asio.hpp> 
    #include <boost/date_time/posix_time/posix_time.hpp> 
    #include <linux/joystick.h> 

    bool message_printed = false; 
    bool message2_printed = false; 

    class Hotas_manager { 
    public: 
     Hotas_manager(); 
     virtual ~Hotas_manager(); 
     void open_hotas_device(); 

     /* 
     * 
     * read from hotas input 
     * used to the updated event data and status of the joystick from the 
     * the file. 
     * 
     */ 
     void readData_from_hotas(); 

     js_event getJoystickEvent() { 
      return joystick_event; 
     } 

     int getNumOfAxis() { 
      return num_of_axis; 
     } 

     int getNumOfButtons() { 
      return num_of_buttons; 
     } 

     bool isAxisPressed() { 
      return axis_pressed; 
     } 

     bool isButtonPressed() { 
      return button_pressed; 
     } 

     int* getAxis() { 
      return axis; 
     } 

     char* getButton() { 
      return button; 
     } 

    private: 
     int fd; 
     js_event joystick_event; 
     bool hotas_connected; 
     int num_of_axis; 
     int num_of_buttons; 
     int version; 
     char devName[80]; 

     /* 
     * the the variables below indicates 
     * the state of the joystick. 
     */ 
     int axis[30]; 
     char button[30]; 
     bool button_pressed; 
     bool axis_pressed; 
    }; 

    Hotas_manager::Hotas_manager() { 
     // TODO Auto-generated constructor stub 
     hotas_connected = false; 
     open_hotas_device(); 
     std::cout << "joystick device detected" << std::endl; 

    } 

    Hotas_manager::~Hotas_manager() { 
     // TODO Auto-generated destructor stub 

    } 

    void Hotas_manager::open_hotas_device() { 
     bool file_open_error_printed = false; 
     while (!hotas_connected) { 
      if ((fd = open(JOY_DEV, O_RDONLY)) > 0) { 
       ioctl(fd, JSIOCGAXES, num_of_axis); 
       ioctl(fd, JSIOCGBUTTONS, num_of_buttons); 
       ioctl(fd, JSIOCGVERSION, version); 
       ioctl(fd, JSIOCGNAME(80), devName); 
       /* 
       * NON BLOCKING MODE 
       */ 
       ioctl(fd, F_SETFL, O_NONBLOCK); 
       hotas_connected = true; 
      } else { 
       if (!file_open_error_printed) { 
        std::cout << "hotas device not detected. check " 
          "whether it is " 
          "plugged" << std::endl; 
        file_open_error_printed = true; 
       } 
       close(fd); 
       hotas_connected = false; 
      } 
     } 

    } 

    void Hotas_manager::readData_from_hotas() { 
     int result; 
     result = read(fd, &joystick_event, sizeof(struct js_event)); 
     if (result > 0) { 
      switch (joystick_event.type & ~JS_EVENT_INIT) { 
      case JS_EVENT_AXIS: 
       axis[joystick_event.number] = joystick_event.value; 
       axis_pressed = true; 
       button_pressed = false; 
       break; 
      case JS_EVENT_BUTTON: 
       button[joystick_event.number] = joystick_event.value; 
       button_pressed = true; 
       axis_pressed = false; 
       break; 
      } 
      message2_printed = false; 
      message_printed = false; 

     } else { 
      if (!message_printed) { 
       std::cout << "problem in reading the stick file" << std::endl; 
       message_printed = true; 
      } 
      hotas_connected = false; 
      open_hotas_device(); 
      if (!message2_printed) { 
       std::cout << "stick re-connected" << std::endl; 
       message2_printed = true; 
      } 

     } 

    } 



I updated the main function to run 3 threads . 




    int main() { 


       boost::asio::io_service receive_from_hmi_io; 
       udpComm receive_from_hmi(receive_from_hmi_io, 6012); 
       receive_from_hmi.setRemoteEndpoint("127.0.0.1", 6011); 
       receive_from_hmi.start_async_receive(); 
       boost::thread thread_receive_from_hmi(&udpComm::run_io_service, 
         &receive_from_hmi); 
       boost::thread thread_serial(&thread_write_to_datalink); 
       boost::thread thread_hotas(&start_hotas); 
       thread_hotas.join(); 
       thread_serial.join(); 
       thread_receive_from_hmi.join(); 
       return 0; 
       } 

空隙thread_write_to_datalink()也将数据写入来自hotas_manager(操纵杆)。

  void thread_write_to_datalink() { 

       /* 
       * boost serial communication 
       */ 
       boost::asio::io_service serial_port_io; 
       Serial_manager serial(serial_port_io, (char*) "/dev/ttyUSB0"); 


       cout << "aileron " << "throttle " << "elevator " << endl; 
       while (1) { 

         // commands from udp communication  

         serial.write_to_serial(read_hmi_buffer, 6); 

        // data come from joystick inputs 

        //cout << aileron<<" "<<throttle<<" "<<elevator<< endl; 
        memcpy(&buffer_manual_flightcontrol[4], &aileron, 4); 
        memcpy(&buffer_manual_flightcontrol[8], &throttle, 4); 
        memcpy(&buffer_manual_flightcontrol[12], &elevator, 4); 

        unsigned char temp; 


        try { 
         serial.write_to_serial(buffer_manual_flightcontrol, 32); 
         //serial.write_to_serial(buffer_manual_flightcontrol, 32); 
        } catch (std::exception& exp) { 
         serial.handle_serial_exception(exp); 
        } 

        try { 
         serial.write_to_serial(buffer_payloadcontrol, 20); 
        } catch (std::exception& exp) { 
         serial.handle_serial_exception(exp); 
        } 

        usleep(100000); 

       } 

      } 

我的问题是我可以如何更好地设计同步这3个线程。如果你的回答说你不需要使用3个线程我需要你告诉我如何。

+0

请在您的代码中使用一致的缩进,它目前很难理解。 – 2013-03-10 03:41:09

+0

也发布'serial.write_to_serial()'的代码,这对我而言并不明显,为什么你需要多线程。 – 2013-03-10 03:41:53

+0

我做了一些修改,但数据不同步。你可以再次检查一遍。 – 2013-03-11 06:59:54

回答

0

让我们从多线程备份一下,你的程序混合了同步和异步操作。你不需要这样做,因为它只会造成混乱。您可以将从UDP套接字读取的缓冲区异步写入串行端口。这可以通过运行io_service的单个线程来实现,消除了任何并发问题。

您需要添加缓冲区管理,以便在串口的async_write的生命周期内将套接字的数据读取保持在范围内,请以async UDP server为例。此外,在async_write

缓冲器写入包含数据

一种或多种缓冲剂研究文档,特别是requirements for buffer lifetime。 虽然可以根据需要复制缓冲区对象,但调用程序保留底层内存块的所有权,必须保证它们在调用处理程序前保持有效。

一旦你已经完成了设计,那么你就可以移动到more advanced技术,比如线程池或多个io_service秒。

+0

谢谢!你的答案是有帮助的。我使用多线程的原因是** void void thread_write_to_datalink()**还向串口写入从**游戏杆输入**接收到的数据。而不是创建该类的两个实例** Serial_manager **我希望它是一个实例,并且可以使用它从udpComm和游戏杆类写入串行端口数据。 – 2013-03-13 12:33:09

+0

@按照目前的措辞,不要提及操纵杆或操纵杆类,请点击您的问题。请编辑您的问题以包含代表性代码,以便我们能够准确地提供答案。 – 2013-03-14 01:52:44

0

您需要让您访问read_hmi_buffer同步。

因此,只要一段代码在该缓冲区中读取或写入,就需要一个互斥锁(std::mutex,pthread_mutex_t或与之等效的窗口)来锁定。

查看此question了解有关概念和其他教程链接的一些解释。

+0

我使用互斥锁,但它不能解决它。可能是我可以使用seprate – 2013-03-11 07:01:16

+0

你没有保护该行:'serial.write_to_serial(read_hmi_buffer,6);'。 – didierc 2013-03-11 07:12:09

+0

另外,请不要在您的问题更新中包含回答提出的修复,否则可能会混淆新人。 – didierc 2013-03-11 07:29:07

相关问题