2016-07-03 36 views
1

我写了一个示例代码来运行for_each的并行实例 我无法加入线程,在下面的代码中。我对并发编程有点早,所以我不知道我是否做得对。线程无法加入for_each并行C++

template <typename Iterator, typename F> 
class for_each_block 
{ 
public : 
     void operator()(Iterator start, Iterator end, F f) { 
      cout << this_thread::get_id << endl; 
      this_thread::sleep_for(chrono::seconds(5)); 
      for_each(start, end, [&](auto& x) { f(x); }); 
    } 
}; 

typedef unsigned const long int ucli; 

template <typename Iterator, typename F> 
void for_each_par(Iterator first, Iterator last, F f) 
{ 
    ucli size = distance(first, last); 
    if (!size) 
     return; 
    ucli min_per_thread = 4; 
    ucli max_threads = (size + min_per_thread - 1)/min_per_thread; 
    ucli hardware_threads = thread::hardware_concurrency(); 

    ucli no_of_threads = min(max_threads, hardware_threads != 0 ? hardware_threads : 4); 

    ucli block_size = size/no_of_threads; 

    vector<thread> vf(no_of_threads); 
    Iterator block_start = first; 
    for (int i = 0; i < (no_of_threads - 1); i++) 
    { 
     Iterator end = first; 
     advance(end, block_size); 
     vf.push_back(std::move(thread(for_each_block<Iterator, F>(),first,end,f))); 
     first = end; 
    } 
    vf.push_back(std::move(thread(for_each_block<Iterator, F>(), first, last, f))); 
    cout << endl; 
    cout << vf.size() << endl; 
    for(auto& x: vf) 
    { 
     if (x.joinable()) 
      x.join(); 
     else 
      cout << "threads not joinable " << endl; 
    } 

    this_thread::sleep_for(chrono::seconds(100)); 
} 

int main() 
{ 
    vector<int> v1 = { 1,8,12,5,4,9,20,30,40,50,10,21,34,33 }; 
    for_each_par(v1.begin(), v1.end(), print_type<int>); 
return 0; 
} 

在上面的代码中,我得到线程无法连接。我也试过异步期货,但我仍然保持一致。我在这里错过了什么吗?

任何帮助是极大的赞赏, 预先感谢您..

回答

4
vector<thread> vf(no_of_threads); 

这将创建no_of_threads默认初始化线程的向量。由于它们是默认初始化的,它们都不会被加入。你可能的意思是:

vector<thread> vf; 
vf.reserve(no_of_threads); 

皮斯坦:std::move上一个临时是多余的:);考虑改变这一点:

vf.push_back(std::move(thread(for_each_block<Iterator, F>(), first, last, f))); 

这样:

vf.emplace_back(for_each_block<Iterator, F>(), first, last, f); 
+0

非常感谢,它现在有效。但我有一个问题。所有线程都打印相同的ID。你也可以请建议更多(我确定我可能做错了一些事情)。另外,我总是对push_back和emplace_back感到困惑,谢谢澄清:) –

+1

@KartikV'this_thread :: get_id'是一个函数,它应该被调用。现在你正在打印函数指针值。 –

+0

@豹子如此真实,我感到很愚蠢。你们两个都睁开了眼睛。我应该仔细观察以避免愚蠢的错误。感谢你们。 –

1

这可能会或可能不会很有趣。我试着重构代码以使用我认为更为习惯的方法。我并不是说你的方法是错误的,但是因为你正在学习线程管理,所以我认为你可能对其他方面有兴趣。

随意点燃火焰/问题。评论在线:

#include <vector> 
#include <chrono> 
#include <thread> 
#include <mutex> 
#include <iomanip> 
#include <future> 

using namespace std; 

// 
// provide a means of serialising writing to a stream. 
// 
struct locker 
{ 
    locker() : _lock(mutex()) {} 

    static std::mutex& mutex() { static std::mutex m; return m; } 
    std::unique_lock<std::mutex> _lock; 
}; 
std::ostream& operator<<(std::ostream& os, const locker& l) { 
    return os; 
} 

// 
// fill in the missing work function 
// 
template<class T> 
void print_type(const T& t) { 
    std::cout << locker() << hex << std::this_thread::get_id() << " : " << dec << t << std::endl; 
} 

// put this in your personable library. 
// the standards committee really should have given us ranges by now... 
template<class I1, class I2> 
struct range_impl 
{ 
    range_impl(I1 i1, I2 i2) : _begin(i1), _end(i2) {}; 

    auto begin() const { return _begin; } 
    auto end() const { return _end; } 

    I1 _begin; 
    I2 _end; 
}; 

// distinct types because sometimes dissimilar iterators are comparable 
template<class I1, class I2> 
auto range(I1 i1, I2 i2) { 
    return range_impl<I1, I2>(i1, i2); 
} 

// 
// lets make a helper function so we can auto-deduce template args 
// 
template<class Iterator, typename F> 
auto make_for_each_block(Iterator start, Iterator end, F&& f) 
{ 
    // a lambda gives all the advantages of a function object with none 
    // of the boilerplate. 
    return [start, end, f = std::move(f)] { 
     cout << locker() << this_thread::get_id() << endl; 
     this_thread::sleep_for(chrono::seconds(1)); 

     // let's keep loops simple. for_each is a bit old-skool. 
     for (auto& x : range(start, end)) { 
      f(x); 
     } 
    }; 
} 


template <typename Iterator, typename F> 
void for_each_par(Iterator first, Iterator last, F f) 
{ 
    if(auto size = distance(first, last)) 
    { 
     std::size_t min_per_thread = 4; 
     std::size_t max_threads = (size + min_per_thread - 1)/min_per_thread; 
     std::size_t hardware_threads = thread::hardware_concurrency(); 

     auto no_of_threads = min(max_threads, hardware_threads != 0 ? hardware_threads : 4); 

     auto block_size = size/no_of_threads; 

     // futures give us two benefits: 
     // 1. they automatically transmit exceptions 
     // 2. no need for if(joinable) join. get is sufficient 
     // 
     vector<future<void>> vf; 
     vf.reserve(no_of_threads - 1); 
     for (auto count = no_of_threads ; --count ;) 
     { 
      // 
      // I was thinking of refactoring this into std::generate_n but actually 
      // it was less readable. 
      // 
      auto end = std::next(first, block_size); 
      vf.push_back(async(launch::async, make_for_each_block(first, end, f))); 
      first = end; 
     } 
     cout << locker() << endl << "threads: " << vf.size() << " (+ main thread)" << endl; 

     // 
     // why spawn a thread for the remaining block? we may as well use this thread 
     // 
     /* auto partial_sum = */ make_for_each_block(first, last, f)(); 

     // join the threads 
     // note that if the blocks returned a partial aggregate, we could combine them 
     // here by using the values in the futures. 
     for (auto& f : vf) f.get(); 
    } 
} 

int main() 
{ 
    vector<int> v1 = { 1,8,12,5,4,9,20,30,40,50,10,21,34,33 }; 
    for_each_par(v1.begin(), v1.end(), print_type<int>); 
    return 0; 
} 

输出样本:

0x700000081000 
0x700000104000 

threads: 3 (+ main thread) 
0x700000187000 
0x100086000 
0x700000081000 : 1 
0x700000104000 : 5 
0x700000187000 : 20 
0x100086000 : 50 
0x700000081000 : 8 
0x700000104000 : 4 
0x700000187000 : 30 
0x100086000 : 10 
0x700000081000 : 12 
0x700000104000 : 9 
0x700000187000 : 40 
0x100086000 : 21 
0x100086000 : 34 
0x100086000 : 33 
Program ended with exit code: 0 

请解释的std ::搬到这里:[start, end, f = std::move(f)] {...};

这是一个可喜的语言功能,它是用C +提供+14。捕获块内的f = std::move(f)相当于:decltype(f) new_f = std::move(f),只是新变量被称为f而不是new_f。它允许我们将std::move对象转换为lambda表达式,而不是复制它们。

对于大多数函数对象来说并不重要 - 但有些可能很大,这给了编译器机会使用移动而不是副本(如果可用)。

+0

这是非常整洁,更衣室,范围真的很酷。你能解释一下你用于函数参数的移动语义吗? –

+0

@KartikV更新。希望有所帮助。 –

+0

非常感谢。你的帮手功能是非常有用的,你可以指向我的博客吗?(或者如果你有时间,你可以写一个) –