2014-03-25 60 views
2

我正在实现一个由N个机器组成的小型分布式系统。他们每个人都从某个远程服务器接收一些数据,然后将数据传播到其他n-1个同伴机器。我正在使用Boost Asio async_read和async_write来实现这一点。我建立了N = 30台机器的测试集群。当我尝试更小的日期(每台机器接收75KB到750KB)时,程序一直运行。但是当我转向一个稍大的数据集(7.5MB)时,我观察到了奇怪的行为:开始时,读写操作按预期发生,但过了一段时间,有些机器被挂起而另一些机器完成,挂起的机器数量每次运行都不一样。我试图在每个处理程序中打印出一些消息,发现对于那些挂起的机器,async_read基本上在一段时间后基本无法成功读取,因此之后无法继续。我检查了远程服务器,他们都写完了。我试过使用strand来控制异步读写的执行顺序,并且我还尝试使用不同的io_services进行读写。他们都没有解决这个问题。我非常绝望。谁能帮我?Boost Asio async_read有时会在读取时挂起,但并不总是如此

以下是一个不读取和传播类的代码:

const int TRANS_TUPLE_SIZE=15; 
const int TRANS_BUFFER_SIZE=5120/TRANS_TUPLE_SIZE*TRANS_TUPLE_SIZE; 
class Asio_Trans_Broadcaster 
{ 
private: 
    char buffer[TRANS_BUFFER_SIZE]; 
    int node_id; 
    int mpi_size; 
    int mpi_rank; 
    boost::asio::ip::tcp::socket* dbsocket; 
    boost::asio::ip::tcp::socket** sender_sockets; 
    int n_send; 
    boost::mutex mutex; 
    bool done; 
public: 
    Asio_Trans_Broadcaster(boost::asio::ip::tcp::socket* dbskt, boost::asio::ip::tcp::socket** senderskts, 
     int msize, int mrank, int id) 
{ 
    dbsocket=dbskt; 
    count=0; 
    node_id=id; 
    mpi_size=mpi_rank=-1; 
    sender_sockets=senderskts; 
    mpi_size=msize; 
    mpi_rank=mrank; 
    n_send=-1; 
    done=false; 
} 

static std::size_t completion_condition(const boost::system::error_code& error, std::size_t bytes_transferred) 
{ 
    int remain=bytes_transferred%TRANS_TUPLE_SIZE; 
    if(remain==0 && bytes_transferred>0) 
     return 0; 
    else 
     return TRANS_BUFFER_SIZE-bytes_transferred; 
} 


void write_handler(const boost::system::error_code &ec, std::size_t bytes_transferred) 
{ 
    int n=-1; 
    mutex.lock(); 
    n_send--; 
    n=n_send; 
    mutex.unlock(); 
    fprintf(stdout, "~~~~~~ @%d, write_handler: %d bytes, copies_to_send: %d\n", 
            node_id, bytes_transferred, n); 
    if(n==0 && !done) 
     boost::asio::async_read(*dbsocket, 
      boost::asio::buffer(buffer, TRANS_BUFFER_SIZE), 
      Asio_Trans_Broadcaster::completion_condition, boost::bind(&Asio_Trans_Broadcaster::broadcast_handler, this, 
      boost::asio::placeholders::error, 
      boost::asio::placeholders::bytes_transferred)); 
} 

void broadcast_handler(const boost::system::error_code &ec, std::size_t bytes_transferred) 
{ 
    fprintf(stdout, "@%d, broadcast_handler: %d bytes, mpi_size:%d, mpi_rank: %d\n", node_id, bytes_transferred, mpi_size, mpi_rank); 
    if (!ec) 
    { 
     int pos=0; 
     while(pos<bytes_transferred && pos<TRANS_BUFFER_SIZE) 
     { 
      int id=-1; 
      memcpy(&id, &buffer[pos], 4); 
      if(id<0) 
      { 
       done=true; 
       fprintf(stdout, "@%d, broadcast_handler: done!\n", mpi_rank); 
       break; 
      } 

      pos+=TRANS_TUPLE_SIZE; 
     } 

     mutex.lock(); 
     n_send=mpi_size-1; 
     mutex.unlock(); 
     for(int i=0; i<mpi_size; i++) 
      if(i!=mpi_rank) 
      { 
       boost::asio::async_write(*sender_sockets[i], boost::asio::buffer(buffer, bytes_transferred), 
           boost::bind(&Asio_Trans_Broadcaster::write_handler, this, 
           boost::asio::placeholders::error, 
           boost::asio::placeholders::bytes_transferred)); 
      } 
    } 
    else 
    { 
     cerr<<mpi_rank<<" error: "<<ec.message()<<endl; 
     delete this; 
    } 


} 

void broadcast() 
{ 
    boost::asio::async_read(*dbsocket, 
      boost::asio::buffer(buffer, TRANS_BUFFER_SIZE), 
      Asio_Trans_Broadcaster::completion_condition, boost::bind(&Asio_Trans_Broadcaster::broadcast_handler, this, 
      boost::asio::placeholders::error, 
      boost::asio::placeholders::bytes_transferred)); 
} 
}; 

下面是每个机器上运行的主要代码:

int N=30; 
boost::asio::io_service* sender_io_service=new boost::asio::io_service(); 
boost::asio::io_service::work* p_work=new boost::asio::io_service::work(*sender_io_service); 
boost::thread_group send_thread_pool; 
for(int i=0; i<NUM_THREADS; i++) 
{ 
    send_thread_pool.create_thread(boost::bind(& boost::asio::io_service::run, sender_io_service)); 
} 

boost::asio::io_service* receiver_io_service=new boost::asio::io_service(); 
shared_ptr<boost::asio::io_service::work> p_work2(new boost::asio::io_service::work(*receiver_io_service)); 
boost::thread_group thread_pool2; 
thread_pool2.create_thread(boost::bind(& boost::asio::io_service::run, receiver_io_service)); 

boost::asio::ip::tcp::socket* receiver_socket; 
    //establish nonblocking connection with remote server 
AsioConnectToRemote(5000, 1, receiver_io_service, receiver_socket, true); 

boost::asio::ip::tcp::socket* send_sockets[N]; 
    //establish blocking connection with other machines 
hadoopNodes = SetupAsioConnectionsWIthOthers(sender_io_service, send_sockets, hostFileName, mpi_rank, mpi_size, 3000, false); 

Asio_Trans_Broadcaster* db_receiver=new Asio_Trans_Broadcaster(receiver_socket, send_sockets, 
mpi_size, mpi_rank, mpi_rank); 

db_receiver->broadcast(); 
    p_work2.reset(); 
    thread_pool2.join_all(); 
    delete p_work; 
send_thread_pool.join_all(); 
+0

做'挂'机有时会成功接收?如果是这样,是否有一定的请求/响应(大小)?另外,您是否尝试过将其设为SSCCE(使用随机有效载荷数据)? – sehe

+0

是的。每次跑步时都会更换一套“悬挂”机器。什么是SSCCE? – user3457135

回答

2

我不知道你的代码正在努力实现。有太多的缺失位。

当然,如果任务是在网络套接字上异步发送/接收流量,那么Asio就是这样的。很难看到代码的特殊之处。

我建议清理更明显的问题:

  • 有(几乎)没有错误处理
  • ,除非你在一个有趣的平台,你的(检查error_code -s!)格式化字符串应该使用%lu for size_t
  • 为什么你会乱搞原始数组,可能是坏的大小,当你只有一个向量?
  • 从未假定物体的大小,如果你可以使用sizeof:

    memcpy(&id, &trans_buffer[pos], sizeof(id)); 
    
  • 来想想看,它看起来像缓冲区的索引是不安全的,反正:

    while(pos < bytes_transferred && pos < TRANS_BUFFER_SIZE) 
        { 
         int id = -1; 
         memcpy(&id, &buffer[pos], sizeof(id)); 
    

    如果例如pos == TRANS_BUFFER_SIZE-1这里的memcpy调用未定义的行为...

  • 为什么会有这么多new?您正在向您的代码中引入大量的错误。好像内存管理不是低级编码的致命弱点。使用值或共享指针。 从不delete this。永远[1]

  • 为什么有那么多重复的代码?为什么一个线程池以sender和另一个thread_pool2命名?其中包含1个线程。嗯?为什么你有一个work项目作为原始指针,另一个作为shared_ptr

    你可能只是刚:

    struct service_wrap { 
        service_wrap(int threads) { 
         while(threads--) 
          pool.create_thread(boost::bind(&boost::asio::io_service::run, boost::ref(io_service))); 
        } 
    
        ~service_wrap() { 
         io_service.post(boost::bind(&service_wrap::stop, this)); 
         pool.join_all(); 
        } 
    
    private: // mind the initialization order! 
        boost::asio::io_service io_service; 
        boost::optional<boost::asio::io_service::work> work; 
        boost::thread_group pool; 
    
        void stop() { 
         work = boost::none; 
        } 
    }; 
    

    所以,你可以简单的写:

    service_wrap senders(NUM_THREADS); 
    service_wrap receivers(1); 
    

    哇。你看到了吗?没有更多的错误机会。如果您修复了一个池,则会自动修复另一个池。没有更多delete第一个,.reset()第二个work项目。简而言之:不再有混乱的代码,并且复杂程度更低。

  • 使用异常安全的锁定后卫:

    int local_n_send = -1; // not clear naming 
    { 
        boost::lock_guard<boost::mutex> lk(mutex); 
        n_send--; 
        local_n_send = n_send; 
    } 
    
  • broadcast身体write_handler()完全重复。为什么不叫它:

    if(local_n_send == 0 && !done) 
        broadcast(); 
    
  • 我觉得还是有一个竞争条件 - 不是在访问n_send本身就是一个数据的比赛,但决定重新广播如果n_send在后达到零可能是错的锁定被释放。现在,由于broadcast()只做一个异步操作,你可以做到这一点的锁下,摆脱了竞争条件:

    void write_handler(const error_code &ec, size_t bytes_transferred) { 
        boost::lock_guard<boost::mutex> lk(mutex); 
    
        if(!(done || --n_send)) 
         broadcast(); 
    } 
    

    WOOP WOOP。这是现在的三行代码。更少的代码就是更少的错误。

我的猜测是,如果你努力磨砺这样的代码,你将不可避免地找到你的线索。想想它,就像你会寻找一个失落的结婚戒指:你不会在周围乱七八糟的。相反,你会从一个房间到另一个房间,整理一下。如果需要的话,先把所有东西都扔掉。

IFF你可以让这件事情自足/和/重复性好,我会进一步调试它为您服务!

干杯

这里是一个起点,我在注视代码所做的:Compiling on Coliru

#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <boost/array.hpp> 
#include <boost/make_shared.hpp> 
#include <boost/ptr_container/ptr_vector.hpp> 
#include <iostream> 

const/*expr*/ int TRANS_TUPLE_SIZE = 15; 
const/*expr*/ int TRANS_BUFFER_SIZE = 5120/TRANS_TUPLE_SIZE * TRANS_TUPLE_SIZE; 

namespace AsioTrans 
{ 
    using boost::system::error_code; 
    using namespace boost::asio; 

    typedef ip::tcp::socket    socket_t; 
    typedef boost::ptr_vector<socket_t> socket_list; 

    class Broadcaster 
    { 
    private: 
     boost::array<char, TRANS_BUFFER_SIZE> trans_buffer; 

     int node_id; 
     int mpi_rank; 

     socket_t& dbsocket; 
     socket_list& sender_sockets; 

     int n_send; 
     boost::mutex mutex; 
     bool done; 
    public: 
     Broadcaster(
      socket_t& dbskt, 
      socket_list& senderskts, 
      int mrank, 
      int id) : 
       node_id(id), 
       mpi_rank(mrank), 
       dbsocket(dbskt), 
       sender_sockets(senderskts), 
       n_send(-1), 
       done(false) 
     { 
      // count=0; 
     } 

     static size_t completion_condition(const error_code& error, size_t bytes_transferred) 
     { 
      // TODO FIXME handler error_code here 
      int remain = bytes_transferred % TRANS_TUPLE_SIZE; 

      if(bytes_transferred && !remain) 
      { 
       return 0; 
      } 
      else 
      { 
       return TRANS_BUFFER_SIZE - bytes_transferred; 
      } 
     } 

     void write_handler(const error_code &ec, size_t bytes_transferred) 
     { 
      // TODO handle errors 
      // TODO check bytes_transferred 
      boost::lock_guard<boost::mutex> lk(mutex); 

      if(!(done || --n_send)) 
       broadcast(); 
     } 

     void broadcast_handler(const error_code &ec, size_t bytes_transferred) 
     { 
      fprintf(stdout, "@%d, broadcast_handler: %lu bytes, mpi_size:%lu, mpi_rank: %d\n", node_id, bytes_transferred, sender_sockets.size(), mpi_rank); 

      if(!ec) 
      { 
       for(size_t pos = 0; (pos < bytes_transferred && pos < TRANS_BUFFER_SIZE); pos += TRANS_TUPLE_SIZE) 
       { 
        int id = -1; 
        memcpy(&id, &trans_buffer[pos], sizeof(id)); 

        if(id < 0) 
        { 
         done = true; 
         fprintf(stdout, "@%d, broadcast_handler: done!\n", mpi_rank); 
         break; 
        } 
       } 

       { 
        boost::lock_guard<boost::mutex> lk(mutex); 
        n_send = sender_sockets.size() - 1; 
       } 

       for(int i = 0; size_t(i) < sender_sockets.size(); i++) 
       { 
        if(i != mpi_rank) 
        { 
         async_write(
           sender_sockets[i], 
           buffer(trans_buffer, bytes_transferred), 
           boost::bind(&Broadcaster::write_handler, this, placeholders::error, placeholders::bytes_transferred)); 
        } 
       } 
      } 
      else 
      { 
       std::cerr << mpi_rank << " error: " << ec.message() << std::endl; 
       delete this; 
      } 
     } 

     void broadcast() 
     { 
      async_read(
        dbsocket, 
        buffer(trans_buffer), 
        Broadcaster::completion_condition, 
        boost::bind(&Broadcaster::broadcast_handler, this, 
         placeholders::error, 
         placeholders::bytes_transferred)); 
     } 
    }; 

    struct service_wrap { 
     service_wrap(int threads) { 
      while(threads--) 
       _pool.create_thread(boost::bind(&io_service::run, boost::ref(_service))); 
     } 

     ~service_wrap() { 
      _service.post(boost::bind(&service_wrap::stop, this)); 
      _pool.join_all(); 
     } 

     io_service& service() { return _service; } 

    private: // mind the initialization order! 
     io_service      _service; 
     boost::optional<io_service::work> _work; 
     boost::thread_group    _pool; 

     void stop() { 
      _work = boost::none; 
     } 
    }; 

    extern void AsioConnectToRemote(int, int, io_service&, socket_t&, bool); 
    extern void SetupAsioConnectionsWIthOthers(io_service&, socket_list&, std::string, int, bool); 
} 

int main() 
{ 
    using namespace AsioTrans; 

    // there's no use in increasing #threads unless there are blocking operations 
    service_wrap senders(boost::thread::hardware_concurrency()); 
    service_wrap receivers(1); 

    socket_t receiver_socket(receivers.service()); 
    AsioConnectToRemote(5000, 1, receivers.service(), receiver_socket, true); 

    socket_list send_sockets(30); 
    /*hadoopNodes =*/ SetupAsioConnectionsWIthOthers(senders.service(), send_sockets, "hostFileName", 3000, false); 

    int mpi_rank = send_sockets.size(); 
    AsioTrans::Broadcaster db_receiver(receiver_socket, send_sockets, mpi_rank, mpi_rank); 
    db_receiver.broadcast(); 
} 

[1]没有例外。除非例外规则有例外。异常ception。

+0

非常感谢您清理我的代码。对不起,我的代码混乱。我很兴奋提升。我不知道写更一致的方法。我会尝试清理后的代码并进一步调试。 – user3457135

相关问题