2016-11-20 118 views
1

我有一个可变大小的大向量。我想检查每个元素(在矢量的特定索引范围lowerRange-upperRange内)是否满足某个条件?在下面的例子中,我的输入向量包含9个元素,我想检查从2到6的元素是否满足check_if_condition()。在这里,lowerRange = 2和upperRange = 6在向量C++中并行化搜索

为此,我写了下面的并行代码来做同样的事情,但是,这个代码的问题是它给出了错误:“glibc detect smallbin linked list corrupted”。我尝试使用valgrind调试代码,但我无法找到错误的确切原因。

我的实际的实际输入向量包含10000000个元素,我想检查999999(lowerRange)-9999999(upperRange)之间的元素(这个范围是由用户指定的,尽管我已经把这个范围当作代码中的常量)索引元素满足check_if_condition

#include <thread> 
#include <vector> 
#include <iostream> 
#include <atomic> 

unsigned check_if_condition(int a) 
{ 
    //Long check here 
    return 1; 
} 

void doWork(std::vector<unsigned>& input, std::vector<unsigned>& results, unsigned assigned, size_t current, size_t end, std::atomic_int& totalPassed) 
{ 
    end = std::min(end, input.size()-2); 
    int numPassed = 0;  
    for(; (current) < end; ++current) { 
     if(check_if_condition(input[current])) { 
      results[current] = true; 
      ++numPassed; 
     } 
    } 

    totalPassed.fetch_add(numPassed); 
} 

int main() 
{ 
    std::vector<unsigned> input;//(1000000); 
    input.push_back(0); input.push_back(1); input.push_back(2); input.push_back(3); input.push_back(4); input.push_back(5); input.push_back(6); input.push_back(7); input.push_back(8); 
    std::vector<unsigned> results(input.size()); 
    std::atomic_int numPassed(0);   
    auto numThreads = std::thread::hardware_concurrency();  
    std::vector<std::thread> threads; 
    unsigned assigned; 

    if(numThreads> input.size()) 
     numThreads=input.size(); 
    std::cout<<"numThreads="<<numThreads<<"\n"; 
    auto blockSize = input.size()/numThreads; 
    for(size_t i = 0; i < numThreads - 1; ++i) //check whether elements from 2 to 6 satisfy check_if_condition 
     threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned,((i+2) * blockSize), ((i+3) * blockSize), std::ref(numPassed)); 


    for(auto& thread : threads) 
     thread.join(); 


    std::vector<int> storage; 
    storage.reserve(numPassed.load()); 

    auto itRes = results.begin(); 
    auto itInput = input.begin(); 
    auto endRes = results.end(); 
    for(; itRes != endRes; ++itRes, ++itInput) { 
     if(*itRes) 
      storage.emplace_back(*itInput);    
    } 

    std::cout<<"\n Storage:"; 
    for(std::vector<int>::iterator i1=storage.begin(), l1=storage.end(); i1!=l1; ++i1) 
     std::cout<<" "<<(*i1)<<"\n"; 

    std::cout << "Done" << std::endl; 
} 
+0

您需要确保在向量子范围上工作的努力不超过向量子范围本身的工作。使用10个线程和1000万个元素,每个线程将有100万个元素;如果工作是每个元素几十个机器指令,那么每个线程都会有1亿条指令,这应该足够多。如果你只有10万个元素,那么你可能没有足够的资源来完成这项工作,这将是一个放缓的过程。真的世界应用程序中真的有1000万个元素吗? –

回答

3

要检查end但不currentdoWork,因此你对

for(size_t i = 0; i < numThreads - 1; ++i) //check whether elements from 2 to 6 satisfy check_if_condition 
     threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned,((i+2) * blockSize), ((i+3) * blockSize), std::ref(numPassed)); 

你的最后一次迭代阅读过去的矢量比方说你的载体是1000元大,你的线程数是8,在最后一次迭代中,您将获得:

i = 7;

current =(7 + 2)* 125 = 1125;

end =(7 + 3)* 125 = 1250;


所以要均匀地分配工作的线程之间对于给定的子范围[rangeStart,则rangeEnd的),你需要执行下面的循环:

for(size_t i = 0; i < numThreads; ++i) 
{ 
    auto start = rangeStart + i * blockSize; 
    auto end = (i == numThreads - 1) ? rangeEnd : start + (i+1) * blockSize; 
    threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned, start, end, std::ref(numPassed)); 
} 

注意,在最后一次迭代end直接设置为rangeEnd的使最后一个线程可能略有更多的工作要做

此外,块大小应调整:

auto blockSize = (rangeEnd - rangeStart)/numThreads; 
+0

ok..thanks很多...我如何修复它 –

+1

首先所有的块大小应(rangeEnd的 - 具有rangeStart)/ numOf线程 –

+0

二的所有电流='范围+ I * blockSize'和'结束=启动+(1 + 1)* blockSize' –