2012-09-27 58 views
1

我想用多个处理器并行化线性运算(将复杂的数学函数拟合到某个数据集)。拟合调度系统

假设我的机器上有8个内核,我想要适合1000个数据集。我所期望的是一些系统将1000个数据集作为队列,并将它们发送到8个内核进行处理,因此它以从1000开始的前8个为FIFO开始。每个数据集的拟合时间通常与其他数据集的拟合时间不同,因此,拟合的8个数据集中的一些可能需要比其他时间更长的时间。我希望从系统中保存拟合数据集的结果,然后从每个已完成线程的大队列(1000个数据集)中继续获取新数据集。这必须恢复,直到处理完整个1000个数据集。然后我可以继续我的程序。

这是什么叫做系统?并有C++的模型?

我与OpenMP并行,并使用高级C++技术,如模板和多态。

谢谢你的努力。

回答

1

您可以将OpenMP并行用于动态计划或OpenMP任务。两者都可以用来平行每次迭代花费不同时间完成的情况。随着动态调度为:

#pragma omp parallel 
{ 
    Fitter fitter; 
    fitter.init(); 
    #pragma omp for schedule(dynamic,1) 
    for (int i = 0; i < numFits; i++) 
     fitter.fit(..., &results[i]); 
} 

schedule(dynamic,1),使每个线程同时执行一个迭代,除非有没有更多的迭代处理线程永远不会闲置。

随着任务:

#pragma omp parallel 
{ 
    Fitter fitter; 
    fitter.init(); 
    #pragma omp single 
    for (int i = 0; i < numFits; i++) 
    { 
     #pragma omp task 
     fitter.fit(..., &results[i]); 
    } 
    #pragma omp taskwait 
    // ^^^ only necessary if more code before the end of the parallel region 
} 

在这里,其中一个线程运行的for循环产生1000个OpenMP的任务。 OMP任务保存在队列中并由空闲线程处理。它的作用有点类似于动态for循环,但允许在代码构造中有更大的自由度(例如,可以使用递归算法并行化的任务)。taskwait构造等待所有未完成的任务完成。它隐含在并行区域的末尾,所以只有在并行区域结束之前有更多的代码出现时才需要它。

在这两种情况下,每个调用fit()都将在不同的线程中完成。您必须确保拟合一组参数不会影响拟合其他组,例如fit()是一个线程安全的方法/函数。这两种情况都需要执行fit()的时间远高于OpenMP构造的开销。

OpenMP任务需要符合OpenMP 3.0的编译器。这就排除了MS VC++的所有版本(甚至是VS2012中的版本),如果你碰巧在Windows上开发的话。

如果您希望每个线程只有一个fitter实例被初始化,那么您应该采取一些不同的方法,例如,使钳工对象全局和threadprivate

#include <omp.h> 

Fitter fitter; 
#pragma omp threadprivate(fitter) 

... 

int main() 
{ 
    // Disable dynamic teams 
    omp_set_dynamic(0); 

    // Initialise all fitters once per thread 
    #pragma omp parallel 
    { 
     fitter.init(); 
    } 

    ... 

    #pragma omp parallel 
    { 
     #pragma omp for schedule(dynamic,1) 
     for (int i = 0; i < numFits; i++) 
     fitter.fit(..., &results[i]); 
    } 

    ... 

    return 0; 
} 

这里fitterFitter类的全局实例。 omp threadprivate指令指示编译器将其放入线程本地存储器,例如,使它成为每个线程的全局变量。这些持续存在于不同的平行区域之间。您也可以在static局部变量上使用omp threadprivate。 (但仅在相同的功能)不同的并行区域之间的这些太坚持:

#include <omp.h> 

int main() 
{ 
    // Disable dynamic teams 
    omp_set_dynamic(0); 

    static Fitter fitter; // must be static 
    #pragma omp threadprivate(fitter) 

    // Initialise all fitters once per thread 
    #pragma omp parallel 
    { 
     fitter.init(); 
    } 

    ... 

    #pragma omp parallel 
    { 
     #pragma omp for schedule(dynamic,1) 
     for (int i = 0; i < numFits; i++) 
     fitter.fit(..., &results[i]); 
    } 

    ... 

    return 0; 
} 

omp_set_dynamic(0)呼叫禁用动态小组,即作为由OMP_NUM_THREADS环境变量指定的每个并行区域将总是与尽可能多的线程中执行。

+0

非常感谢您的回答。但问题比这更复杂一点。我有一个具有成员函数来完成拟合的类。如果同时使用多个线程,则类实例不是线程安全的。我能想到的解决方案是每个线程都有一个适配类的实例;但我不知道如何做到这一点。你能否告诉我是否有可能告诉程序每个线程必须接受一个类的prive实例,并用一些函数init(...)初始化它,并使用正确的参数,我知道每个数据集都适合我? –

+0

还有一件事。为了提高效率,我希望在程序开始时只运行init(...)一次。它包含一些非常昂贵的矩阵分配。谢谢! –

+1

所有声明_inside_并行区域的变量都是线程本地的,即每个线程都获得这些变量的私有副本。声明_outside_(和之前)并行区域的变量默认共享,除非'omp parallel'构造中指定了'private(varname)'子句。我会修改我的答案以反映这一点。 –

1

,你基本上要的是工人的池(或线程池),它采取的工作从队列中,对它进行处理,并随后用另一份工作继续进行。 OpenMP提供了处理这些任务的不同方法,例如障碍(所有工作人员一直运行到某一点,只有在满足某些要求时才执行),或在工人设法计算其各自部分后,将值累加到全局变量中。

你的问题非常宽泛,但我可以给你的另一个提示是看看MapReduce范例。在这个范例中,函数被映射到数据集上,并且结果被排序到使用另一个函数(可能再次是相同函数)中减少的桶中。在你的情况下,这意味着你的每个处理器/内核/节点将给定的函数映射到其分配的一组数据上,并将结果桶发送到另一个负责组合它的节点。我想如果你想使用MapReduce和C++而不使用特定的MapReduce框架,你必须研究MPI。当你在一个节点上运行程序时,也许你可以用OpenMP做类似的事情,所以在网上搜索可能会有所帮助。

TL; DR搜索工人的池(线程池),障碍的MapReduce

+0

+1或许应该提到“线程池”而不是“工人池” – Brady