0

我已经在C++中实现了一个多线程合并排序,但是我碰到了一堵墙。如何在多线程时停止用尽堆栈空间?

在我的实现,我递归地拆分输入向量分成两个部分,然后线程这两个部分:

void MergeSort(vector<int> *in) 
{ 
if(in->size() < 2) 
    return; 

vector<int>::iterator ite = in->begin(); 
vector<int> left = vector<int> (ite, ite + in->size()/2); 
vector<int> right = vector<int> (ite + in->size()/2, in->end()); 

//current thread spawns 2 threads HERE 
thread t1 = thread(MergeSort, &left); 
thread t2 = thread(MergeSort, &right); 

t1.join(); 
t2.join(); 

vector<int> ret; 
ret.reserve(in->size()); 

ret = MergeSortMerge(left, right); 

in->clear(); 
in->insert(in->begin(), ret.begin(), ret.end()); 

return; 
} 

的代码看起来是很漂亮,但它是最恶毒的代码我有一个曾经写过。试图排序一个超过1000个int值的数组导致这么多的线程产生,我用完堆栈空间,我的电脑BSOD :(一致。很多线程,这不是很好,但技术上(如果不是理论上),这是不是一个适当的实施?

基于一点谷歌搜索,我似乎已经发现需要一个线程池。线程池解决我遇到的基本问题,我试图产生太多的线程的事实?如果是这样,你有任何关于库的建议?

谢谢你的建议和帮助!

+4

使用线程的一个原因是使用计算机中的所有内核。在这种情况下,创建比核心更多的线程没有任何价值。 –

+1

即使这个工作,它会很慢,递归地产生线程肯定听起来像是一场灾难的秘诀。正如Brian所提到的,一个想法是将线程限制为'std :: thread :: hardware_concurrency'的值,但即使如此,我也不确定这会比'std :: sort'更快。 – user2802841

+0

你不应该能够从这引起BSOD ......什么操作系统?假设Windows ...什么版本? (对于我自己的好奇心,我同意已发布的答案) – TypeIA

回答

1

由于zdan解释,你要限制的线程数。有两点需要考虑,以确定什么是极限,

  1. CPU核心的数量。在C++ 11中,您可以使用std::thread::hardware_concurrency()来确定硬件核心。然而,这个函数可能返回0意味着程序不知道有多少核心,在这种情况下,你可能会认为这个值是2或4.

  2. 受限于被处理数据的数量。您可以将要由线程处理的数据划分为每个线程1个数据,但只有1个数据会花费太多,并且不具有成本效益。例如,你可以说,当数据的数量小于50时,你不想再划分。因此,您可以根据total_data_number/50 + 1之类的内容确定所需的最大线程数。

然后,您选择案例1中的最小数字&案例2来确定限制。

在你的情况,因为你是递归生成的线程,你可以尝试确定以类似的方式递归深度。

+0

啊,所以我会手动设置递归深度的数量,然后让每个线程使用尽可能多的数据。谢谢你的解释,这非常有帮助。 – lululoo

1

我不认为一个线程池是要帮助你。由于您的算法是递归的,因此您将得到池中所有线程都被占用的点,并且池不会再创建任何线程,并且算法会被阻塞。

你很可能只是限制你的线程创建递归深度为2或3(除非你有CPU的的。它不会让任何性能上的差异)。

+0

我看到了...所以我的设计必须考虑到我的线程数量有限,因为我试图利用多个线程。 T_T 其实,你知道我会怎么去限制这种计算的递归深度吗? – lululoo

0

你可以设置你的堆栈空间的限制,但它是徒劳的。线程太多,即使有一个池,也会以log2(N)*每线程成本将其吞掉。寻找迭代方法并减少开销。开销是杀手。 就性能而言,您会发现使用N个线程的过度提交的某个级别,其中硬件并发性可能会产生最佳结果。开销和每核心工作之间会有很好的平衡。如果N非常大,比如在GPU上,那么存在其他选项(Bitonic),这些选项会进行不同的折衷以减少通信(等待/连接)开销。

假设你有一个任务管理器,是池莉构建N个允许通过等待任务之前通知信号, `

#include <algorithm> 
#include <array> 
#include <cstdint> 
#include <vector> 
#include <sometaskmanager.h> 

void parallel_merge(size_t N) { 
    std::array<int, 1000> ary {0}; 
    // fill array... 
    intmax_t stride_size = ary.size()/N; //TODO: Put a MIN size here 
    auto semaphore = make_semaphore(N); 
    using iterator = typename std::array<int, 1000>::iterator; 
    std::vector<std::pair<iterator, iterator>> ranges; 
    auto last_it = ary.begin(); 
    for(intmax_t n=stride_size; n<N; n +=stride_size ) { 
     ranges.emplace_back(last_it, std::next(last_it, std::min(std::distance(last_it, ary.end()), stride_size))); 
     semaphore.notify(); 
    } 
    for(auto const & rng: ranges) { 
     add_task([&semaphore,rng]() { 
     std::sort(rng.first, rng.second); 
     }); 
    } 
    semaphore.wait(); 
    std::vector<std::pair<iterator, iterator>> new_rng; 
    while(ranges.size() > 1) { 
     semaphore = make_semaphore(ranges.size()/2); 
     for(size_t n=0; n<ranges.size(); n+=2) { 
      auto first=ranges[n].first; 
      auto last=ranges[n+1].second; 
      add_task([&semaphore, first, mid=ranges[n].second, last]() { 
       std::inplace_merge(first, mid, last); 
       semaphore.notify(); 
      }); 
      new_rng.emplace_back(first, last); 
     } 
     if(ranges.size() % 2 != 0) { 
      new_rng.push_back(ranges.back()); 
     } 
     ranges = new_rng; 
     semaphore.wait(); 
    } 
} 

正如你所看到的,瓶颈是在合并阶段的有必须做很多协调工作。 Sean Parent做了一个很好的介绍,如果你没有一个任务管理器,并且在他的演示文稿Better Code:Concurrency,http://sean-parent.stlab.cc/presentations/2016-11-16-concurrency/2016-11-16-concurrency.pdf的演示文稿中介绍了相关的性能分析, TBB和PPL有任务经理。