2014-12-24 41 views
0

我目前正在使用一个小型servlet通过TCP发送模拟数据,使用boost :: asio作为网络部分。我设法在我的机器上的两个进程之间获得通信(简单的客户端是用Python编写的)。问题在于相同的数据不断通过套接字发送,而不是被更新。Boost :: ASIO多线程写入过时数据到套接字?

我使用两个线程:一个运行模拟,创建数据,并使用当前数据更新服务器的连接对象。第二个运行服务器,每隔一段时间都会将当前数据写入套接字。我已经创建了一个最小的例子在这里与你分享(它使用MSVC++ 12.0进行编译,并且如果你喜欢复制,就有我在说的问题)。

tcp_server * server; 
bool connected = false; 

void runServer() { 
    try 
    { 
     boost::asio::io_service io_service; 
     server = new tcp_server(io_service); 

     connected = true; 
     io_service.run(); 
    } 
    catch (std::exception& e) 
    { 
     std::cerr << e.what() << std::endl; 
    } 
} 

void runSim() { 
    for (int i = 0; i < 1000; i++) { 
     if (connected) 
      server->setData("Current Message: " + std::to_string(i)); 

     boost::this_thread::sleep(boost::posix_time::seconds(1)); 
    } 
} 

int _tmain(int argc, _TCHAR* argv[]) 
{ 
    boost::thread serverThread(runServer); 
    boost::thread simThread(runSim); 

    simThread.join(); 
    serverThread.join(); 

    return 0; 
} 

这里有两个类,TCP_Connection和TCP_Server。这些非常接近地复制boost网站上的boost :: asio教程中找到的那些。

class tcp_connection 
    : public boost::enable_shared_from_this<tcp_connection> 
{ 
public: 
    typedef boost::shared_ptr<tcp_connection> pointer; 

    static pointer create(boost::asio::io_service& io_service) 
    { 
     return pointer(new tcp_connection(io_service)); 
    } 

    tcp::socket& socket() 
    { 
     return socket_; 
    } 

    void start() 
    { 
     message_ = make_daytime_string(); 

     boost::asio::async_write(socket_, boost::asio::buffer(message_), 
      boost::bind(&tcp_connection::handle_write, shared_from_this())); 
    } 

    void setData(std::string msg) { 
     boost::unique_lock<boost::shared_mutex> msgLock(msgMutex, boost::try_to_lock); 
     if (msgLock.owns_lock()) { 
      message_ = msg; 
     } 
    } 

private: 
    tcp_connection(boost::asio::io_service& io_service) 
     : socket_(io_service) 
    { 
     timer_ = new boost::asio::deadline_timer(io_service,boost::posix_time::milliseconds(250)); 
    } 

    void handle_write() 
    { 
     boost::shared_lock<boost::shared_mutex> msgLock(msgMutex); 
     std::cout << "Writing to socket: " << message_ << std::endl; 
     boost::asio::write(socket_, boost::asio::buffer(message_));  

     timer_->expires_at(timer_->expires_at() + boost::posix_time::milliseconds(1500)); 
     timer_->async_wait(boost::bind(&tcp_connection::handle_write, shared_from_this())); 
    } 

    tcp::socket socket_; 
    std::string message_; 
    int counter_; 
    boost::asio::deadline_timer * timer_; 
    boost::shared_mutex msgMutex; 
}; 

class tcp_server 
{ 
public: 
    tcp_server(boost::asio::io_service& io_service) 
     : acceptor_(io_service, tcp::endpoint(tcp::v4(), 13)) 
    { 
     start_accept(); 
    } 

    void setData(std::string msg) { 
     if (current_connection != NULL) { 
      current_connection->setData(msg); 
     } 
    } 

private: 
    void start_accept() 
    { 
     tcp_connection::pointer new_connection = 
      tcp_connection::create(acceptor_.get_io_service()); 

     acceptor_.async_accept(new_connection->socket(), 
      boost::bind(&tcp_server::handle_accept, this, new_connection, 
      boost::asio::placeholders::error)); 

     current_connection = new_connection; 
    } 

    void handle_accept(tcp_connection::pointer new_connection, 
     const boost::system::error_code& error) 
    { 
     if (!error) 
     { 
      new_connection->start(); 
      std::cout << "New Connection on 127.0.0.1" << std::endl; 
     } 

     start_accept(); 
    } 

    tcp::acceptor acceptor_; 
    tcp_connection::pointer current_connection; 
}; 

通过明智地使用std ::法院的,我已经成功地确定服务器线程从模拟线程获取数据,并连接对象被传递一样好(因为使用setData( )方法正在被调用,当它应该)。无论出于何种原因,它看起来像连接的成员'message_'没有被更新。我也知道连接没有从控制台的“新建连接”更新重置或重新创建。

+1

您似乎正在使用异步'boost :: asio :: async_write()'操作混合同步'boost :: asio :: write()'操作。同样,为什么'async_wait()'在socket上写入另一个数据流的完成处理程序? –

+0

嗯,我这样做是因为这是我可以找出如何在传输之间设置一段时间(async_wait位于具有指定周期的截止时间定时器)的唯一方法。我的想法是写入重复发生,数据由另一个线程更新,这就是完成处理程序开始另一个写入和等待的原因。 – gankoji

+0

我应该补充说我只添加了代码来使用阻塞boost :: asio :: write()。 async_write()来自我用作此应用程序基础的教程,如果需要或仅使用一个或另一个的理由,可以很容易地将其更改为阻止写入。 – gankoji

回答

1

好的,萨姆米勒在这里得到了答案的功劳,但他把它作为评论发布,所以我现在正在回答关闭这个问题,我已经明白了。最终,该错误很可能是交错写入调用和访问对象数据的问题。我重写了我的示例代码,仅使用Sam提供的其他答案中已经链接的准则,仅包含一个类(而不是上面的两个)。我也让所有的写操作都是异步的。现在的代码如下:

#include <iostream> 
#include <string> 
#include <boost/bind.hpp> 
#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <deque> 

using boost::asio::ip::tcp; 
using namespace std; 

class tcp_server { 
public: 
    tcp_server(boost::asio::io_service& io_service) 
     : _acceptor(io_service, tcp::endpoint(tcp::v4(), 5005)), _socket(io_service) 
    { 
     messages = std::deque<std::string> (1,"Hello from Jake's shitty server"); 

     timer_ = new boost::asio::deadline_timer(io_service, boost::posix_time::milliseconds(250)); 

     start_accept(); 
    } 

    void write(std::string message) { 
     boost::unique_lock<boost::shared_mutex> queueLock(queueMutex); 
     messages.push_back(message); 
     if (messages.size() <= 1) 
      handle_write(); 
    } 
private: 
    void start_accept() { 
     _acceptor.async_accept(_socket, 
      boost::bind(&tcp_server::handle_accept, this, 
      boost::asio::placeholders::error)); 
    } 

    void handle_accept(boost::system::error_code e) { 
     if (!messages.empty()) { 

      _message = messages.front(); 
      messages.pop_front(); 

      boost::asio::async_write(_socket, boost::asio::buffer(_message), 
       boost::bind(&tcp_server::handle_write, this)); 

     } 
    } 

    void handle_write() { 

     if (!messages.empty()) { 
      timer_->expires_at(timer_->expires_at() + boost::posix_time::milliseconds(1500)); 
      timer_->async_wait(boost::bind(&tcp_server::handle_accept, this, boost::asio::placeholders::error)); 
     } 

     return; 
    } 

    std::string _message; 
    std::deque<std::string> messages; 

    tcp::acceptor _acceptor; 
    tcp::socket _socket; 
    boost::asio::deadline_timer * timer_; 
    boost::shared_mutex queueMutex; 


}; 


tcp_server * server; 

void addMessages() { 
    for (int i = 0; i < 10; i++) { 
     server->write("New Message. Count: " + std::to_string(i) + ".\n"); 
    } 
} 


int _tmain(int argc, _TCHAR* argv[]) 
{ 
    boost::asio::io_service io_service; 
    server = new tcp_server(io_service); 

    server->write("Hey there sexy"); 
    boost::thread messenger(addMessages); 

    io_service.run(); 

    return 0; 
} 

TL; DR使用消息队列,并且不要混合使用异步/同步写入。

另外,我在处理这个时遇到了一个有趣的问题,那就是我使用从消息队列中弹出的临时字符串填充boost :: asio :: buffer。这使VS 2013的调试断言失败,说一个字符串迭代器是不可忽略的。一旦我将_message属性添加到类中,并用它来构建缓冲区,一切都运行良好。在这里发现小费:Expression: string iterator not dereferencable while using boost regex。感谢您的帮助山姆!

+1

很高兴你知道了! –