2012-06-12 187 views
4

我想用boost::asio来设置一个线程池。 我的问题是:如何将特定数据附加到创建的每个线程,以及如何管理单个输出?使用线程池进行仿真:boost-thread和boost-asio

更具体地说,我写了一个类Simulation,它通过输入一些参数的方法来执行模拟。 该类包含计算所需的全部数据。 由于数据不是太大,我想复制它以便在池的每个线程中使用类Simulation的不同实例。

我愿做这样的事情: (建立一个线程池在此说明:SOAsio recipes

class ParallelSimulation 
{ 
    public: 
    static const std::size_t N = 10; 

    protected: 
    std::vector< boost::shared_ptr<Simulation> > simuInst; // N copy of a reference instance. 

    public: 

    ... 

    // Simulation with a large (>>N) number of inputs 
    void eval(std::vector<SimulationInput> inputs) 
    { 
     // Creation of the pool using N threads 
     asio::io_service io_service; 
     asio::io_service::work work(io_service); 
     boost::thread_group threads; 
     for (std::size_t i = 0; i < N; ++i) 
     threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

     // Here ? Attaching the duplicates instances of class Simulation ? 

     // Adding tasks 
     for(std::size_t i = 0, i_end = inputs.size(); i<i_end; ++i) 
     io_service.post(...); // add simulation with inputs[i] to the queue 

     // How to deal with outputs ? 

     // End of the tasks 
     io_service.stop(); 
     threads.join_all(); 
    } 
}; 

也许用于建立一个线程池(使用boost::asio),该技术是不适应我的问题。你有什么建议吗? 谢谢。

回答

1

这里是我的研究成果!

分布式仿真基于主类DistributedSimulation使用两个实现类:impl::m_io_serviceimpl::dispatcher

boost::asio线程池基于将io_service::run()方法附加到不同的线程。
这个想法是重新定义这个方法,并包含一个机制来识别当前线程。以下解决方案基于boost::uuid的线程本地存储boost::thread_specific_ptr阅读特雷斯的评论后,我认为使用boost::thread::id识别线程是一个更好的解决方案(但相当于并不太不同)。
最后,另一个类用于将输入数据分派给类Simulation的实例。这个类创建了几个相同类Simulation的实例,并使用它们来计算每个线程的结果。下面

namespace impl { 

    // Create a derived class of io_service including thread specific data (a unique identifier of the thread) 
    struct m_io_service : public boost::asio::io_service 
    { 
    static boost::thread_specific_ptr<boost::uuids::uuid> ptrSpec_; 

    std::size_t run() 
    { 
     if(ptrSpec_.get() == 0) 
     ptrSpec_.reset(new boost::uuids::uuid(boost::uuids::random_generator()()) ); 

     return boost::asio::io_service::run(); 
    } 
    }; 


    // Create a class that dispatches the input data over the N instances of the class Simulation 
    template <class Simulation> 
    class dispatcher 
    { 
    public: 
     static const std::size_t N = 6; 

     typedef Simulation::input_t input_t; 
     typedef Simulation::output_t output_t; 

     friend DistributedSimulation; 

    protected: 
     std::vector< boost::shared_ptr<Simulation> > simuInst; 
     std::vector<boost::uuids::uuid>   map; 

    public: 

     // Constructor, creating the N instances of class Simulation 
     dispatcher(const Simulation& simuRef) 
     { 
     simuInst.resize(N); 
     for(std::size_t i=0; i<N; ++i) 
      simuInst[i].reset(simuRef.clone()); 
     } 

     // Record the unique identifiers and do the calculation using the right instance of class Simulation 
     void dispatch(const Simulation::input_t& in ) 
     { 
     if(map.size() == 0) { 
      map.push_back(*m_io_service::ptrSpec_); 
      simuInst[0]->eval(in, *m_io_service::ptrSpec_); 
     }  
     else { 
      if(map.size() < N) { 
      map.push_back(*m_io_service::ptrSpec_); 
      simuInst[map.size()-1]->eval(in, *m_io_service::ptrSpec_); 
      } 
      else { 
      for(size_t i=0; i<N;++i) { 
       if(map[i] == *m_io_service::ptrSpec_) { 
       simuInst[i]->eval(in, *m_io_service::ptrSpec_); 
       return; 
       } 
      } 
      } 
     } 
     } 
    }; 

    boost::thread_specific_ptr<boost::uuids::uuid> m_io_service::ptrSpec_; 
} 


    // Main class, create a distributed simulation based on a class Simulation 
    template <class Simulation> 
    class DistributedSimulation 
    { 
    public: 
    static const std::size_t N = impl::dispatcher::N; 

    protected: 
    impl::dispatcher _disp; 

    public: 
    DistributedSimulation() : _disp(Simulation()) {} 

    DistributedSimulation(Simulation& simuRef) 
    : _disp(simuRef) { } 


    // Simulation with a large (>>N) number of inputs 
    void eval(const std::vector<Simulation::input_t>& inputs, std::vector<Simulation::output_t>& outputs) 
    { 

     // Clear the results from a previous calculation (and stored in instances of class Simulation) 
     ... 

     // Creation of the pool using N threads 
     impl::m_io_service io_service; 
     boost::asio::io_service::work work(io_service); 
     boost::thread_group threads; 
     for (std::size_t i = 0; i < N; ++i) 
     threads.create_thread(boost::bind(&impl::m_io_service::run, &io_service)); 

     // Adding tasks 
     for(std::size_t i = 0, i_end = inputs.size(); i<i_end; ++i) 
     io_service.post(boost::bind(&impl::dispatcher::dispatch, &_disp, inputs[i])); 

     // End of the tasks 
     io_service.stop(); 
     threads.join_all(); 

     // Gather the results iterating through instances of class simulation 
     ... 
    } 
    }; 

编辑

的代码是我以前的解决方案的更新,同时考虑到特雷斯的评论。正如我之前所说,它更简单易读!

template <class Simulation> 
    class DistributedSimulation 
    { 
    public: 
     typedef typename Simulation::input_t input_t; 
     typedef typename Simulation::output_t output_t; 

     typedef boost::shared_ptr<Simulation> SimulationSPtr_t; 
     typedef boost::thread::id    id_t;  
     typedef std::map< id_t, std::size_t >::iterator IDMapIterator_t; 

    protected: 
     unsigned int     _NThreads; // Number of threads 
     std::vector<SimulationSPtr_t> _simuInst; // Instances of class Simulation 
     std::map< id_t, std::size_t > _IDMap;  // Map between thread id and instance index. 

    private: 
     boost::mutex _mutex; 

    public: 

     DistributedSimulation() {} 

     DistributedSimulation(const Simulation& simuRef, const unsigned int NThreads = boost::thread::hardware_concurrency()) 
     { init(simuRef, NThreads); } 

     DistributedSimulation(const DistributedSimulation& simuDistrib) 
     { init(simuRef, NThreads); } 

     virtual ~DistributedSimulation() {} 

     void init(const Simulation& simuRef, const unsigned int NThreads = boost::thread::hardware_concurrency()) 
     { 
     _NThreads = (NThreads == 0) ? 1 : NThreads; 
     _simuInst.resize(_NThreads); 
     for(std::size_t i=0; i<_NThreads; ++i) 
      _simuInst[i].reset(simuRef.clone()); 
     _IDMap.clear(); 
     } 


     void dispatch(const input_t& input) 
     { 
     // Get current thread id 
     boost::thread::id id0 = boost::this_thread::get_id(); 

     // Get the right instance 
     Simulation* sim = NULL;   
     { 
      boost::mutex::scoped_lock scoped_lock(_mutex); 
      IDMapIterator_t it = _IDMap.find(id0); 
      if(it != _IDMap.end()) 
      sim = _simuInst[it->second].get(); 
     } 

     // Simulation 
     if(NULL != sim) 
      sim->eval(input); 
     } 


     // Distributed evaluation. 
     void eval(const std::vector<input_t>& inputs, std::vector<output_t>& outputs) 
     { 
     //--Initialisation 
     const std::size_t NInputs = inputs.size(); 

     // Clear the ouptuts f(contained in instances of class Simulation) from a previous run 
     ... 

     // Create thread pool and save ids 
     boost::asio::io_service io_service; 
     boost::asio::io_service::work work(io_service); 
     boost::thread_group threads; 
     for (std::size_t i = 0; i < _NThreads; ++i) 
     { 
      boost::thread* thread_ptr = threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service)); 
      _IDMap[ thread_ptr->get_id() ] = i; 
     } 

     // Add tasks 
     for(std::size_t i = 0; i < NInputs; ++i) 
      io_service.post(boost::bind(&DistributedSimulation::dispatch, this, inputs[i])); 

     // Stop the service 
     io_service.stop(); 
     threads.join_all(); 

     // Gather results (contained in each instances of class Simulation) 
     ... 
     } 
    }; 
0

这应该适用于您的应用程序。当您拨打io_service.post时,您将以inputs[i]作为参数传入模拟功能。在该函数中(大概是Simulation的成员函数),只需将计算结果存储在Simulation对象中,然后在加入线程以收集输出后迭代对象。

如果您需要确定执行工作的特定线程,您也可以将i作为参数。这假设在模拟完成后收集输出是可以的。

如果您在运行时需要访问输出,只需根据需要将函数post的输出任务改为io_service即可。确保使用互斥体保护任何共享数据结构!

+0

谢谢你的回答。 但是我不知道如何使用类Simulation的N个实例来确保线程'i'用实例'i'计算结果(因此使用它自己的数据来完成工作)。 处理输出的方式似乎很好,谢谢! –

+0

@ gleeen.gould你能说清楚为什么使用线程'i'来计算模拟'i'的工作吗?这是可能的(当你创建它时调用'thread-> get_id()',把它传递给'simulation [i]',然后让'simulation [i]'的函数检查它的执行时间,'boost :: this_thread :: get_id()'匹配,如果没有返回,否则继续),但我不认为这是必要的。 – Tres

+0

看看我的解决方案,我希望这会澄清我的目标。 –