2014-05-16 23 views
0

我目前正在写一个物理模拟(t.b.m.p.求解一个随机微分方程),我需要并行化它。
现在这可以通过MPI来实现,我想我将来需要做一些工作,但是现在我想利用我的本地机器的所有8个内核。对于一个参数集,正常运行需要2-17小时。所以我想利用多线程,特别是下面的函数应该并行执行。对于Nsteps时间步长,此功能基本上解决了相同的SDE Nrep次。将结果取平均值并将每个线程存储到Nthreads x Nsteps阵列JpmArr的单独行中。多线程和动态数组/矩阵运算的

double **JpmArr; 
void worker(const dtype gamma, const dtype dt, const uint seed, const uint Nsteps, const uint Nrep,\ 
      const ESpMatD& Jplus, const ESpMatD& Jminus, const ESpMatD& Jz, const uint tId){ 


dtype dW(0), stdDev(sqrt(dt)); 

std::normal_distribution<> WienerDistr(0, stdDev); 

//create the arrays for the values of <t|J+J-|t> 
dtype* JpmExpect = JpmArr[tId]; 

//execute Nrep repetitions of the experiment 
for (uint r(0); r < Nrep; ++r) { 
    //reinitialize the wave function 
    psiVecArr[tId] = globalIstate; 
    //<t|J+J-|t> 
    tmpVecArr[tId] = Jminus* psiVecArr[tId]; 
    JpmExpect[0] += tmpVecArr[tId].squaredNorm(); 
    //iterate over the timesteps 
    for (uint s(1); s < Nsteps; ++s) { 

     //get a random number 
     dW = WienerDistr(RNGarr[tId]); 

     //execute one step of the RK-s.o. 1 scheme 
     tmpPsiVecArr[tId] = F2(gamma, std::ref(Jminus), std::ref(psiVecArr[tId])); 
     tmpVecArr[tId] = psiVecArr[tId] + tmpPsiVecArr[tId] * sqrt(dt); 
     psiVecArr[tId] = psiVecArr[tId] + F1(gamma, std::ref(Jminus), std::ref(Jplus), std::ref(psiVecArr[tId])) * dt + tmpPsiVecArr[tId] * dW \ 
+ 0.5 * (F2(gamma, std::ref(Jminus), std::ref(tmpVecArr[tId])) - F2(gamma, std::ref(Jminus), std::ref(psiVecArr[tId]))) *(dW * dW - dt)/sqrt(dt); 

     //normalise 
     psiVecArr[tId].normalize(); 
     //compute <t|J+J-|t> 
     tmpVecArr[tId] = Jminus* psiVecArr[tId]; 
     JpmExpect[s] += tmpVecArr[tId].squaredNorm(); 
    } 
} 


//average over the repetitions 
for (uint j(0); j < Nsteps; ++j) { 
    JpmExpect[j] /= Nrep; 
} 
} 

我使用Eigen作为线性代数库,因而:

typedef Eigen::SparseMatrix<dtype, Eigen::RowMajor> ESpMatD; 
typedef Eigen::Matrix<dtype, Eigen::Dynamic, Eigen::RowMajor> VectorXdrm; 

被用作类型。上述工函数调用:

VectorXdrm& F1(const dtype a, const ESpMatD& A, const ESpMatD& B, const VectorXdrm& v) { 
z.setZero(v.size()); 
y.setZero(v.size()); 

// z is for simplification 
z = A*v; 

//scalar intermediate value c = <v, Av> 
dtype c = v.dot(z); 

y = a * (2.0 * c * z - B * z - c * c * v); 
return y; 
} 

VectorXdrm& F2(const dtype a, const ESpMatD& A, const VectorXdrm& v) { 
//zero the data 
z.setZero(v.size()); 
y.setZero(v.size()); 

z = A*v; 

dtype c = v.dot(z); 

y = sqrt(2.0 * a)*(z - c * v); 
return y; 
} 

其中矢量z,yVectorXdrm是类型,并且在相同的文件(模块全局)被声明。 所有的数组(RNGarr, JpmArr, tmpPsiVecArr, tmpVecArr, psiVecArr)都在main中初始化(使用main.cpp中的extern声明)。在完成设置后,我使用std::async运行函数,等待所有完成,然后将JpmArr中的数据收集到main()中的单个数组中,并将其写入文件。

问题: 如果我使用std::launch::async,结果是无稽之谈。 如果我使用std::launch::deferred,计算结果和平均结果匹配(直到数值方法允许)我通过分析手段获得的结果。

我不知道东西失败的地方。我曾经使用Armadillo作为线性代数,但它的normalize例程交付了nan's,所以我切换到Eigen,它提示(在文档中)可用于多个线程 - 它仍然失败。 在我花了4天的时间之前没有和线程一起工作,现在试图让这个工作和阅读的东西。后者导致我使用全局数组RNGarr, JpmArr, tmpPsiVecArr, tmpVecArr, psiVecArr(之前我只是试图在worker中创建合适的数组,并将结果通过struct workerResult返回到main。)以及使用std::ref()将矩阵Jplus, Jminus, Jz传递给该函数。 (为了简洁起见,上面的功能被省略了) -

但是我得到的结果仍然是错误的,我不再有任何想法了,什么是错的,我应该怎么做才能获得正确的结果。任何输入和/或指向此类(线程)问题或参考解决方案示例的指针都将不胜感激。

+0

你能展示实际调用启动:: async的代码?能够运行多个线程的关键要求是所有的代码都应该是线程安全的(这很难确定,但是在C++中改变为true比在C中更好),并且每个线程应该在其自己的数据结构上工作(或一些程序上的保证,他们没有彼此踩脚趾) – Soren

+0

'auto ws = std :: async(std :: launch :: deferred,&worker,gamma,dt,RNGseed [s],Nsteps,NrepPt,std :: ref(Jplus),std :: ref(Jminus),std :: ref(Jz),tId ++);'哪里s'是0,1,2,3现在手动设置 – Nox

+0

Jplus,Jminus - 它们是引用其他矩阵?在工作人员正在运行时他们是否会被另一个线程更新? – Soren

回答

0

在每个线程中计算之间显然存在一些相互作用 - 或者是由于您的全局数据正在被多个线程更新,或者虽然通过引用传递的某些结构在运行时发生变异 - zy不能如果它们由多个线程更新,则为全局变量 - 但可能存在许多其他问题

我建议您按以下方式重构代码;

  • 使其面向对象 - 定义一个自包含的类。把所有给予工人的参数,并让他们成为班级的成员。
  • 如果您不确定数据结构是否有变异,请不要通过引用传递它们。如果有疑问承认最差,并在课堂上完成全部副本。
  • 如果线程必须更新共享结构(您的用例中应该没有任何内容),则需要保护互斥读取和互斥写入以进行独占访问。
  • 删除所有全局 - 而不是全球JpmArr,zy,定义类中所需的数据。
  • 使worker,F1,F2你的班级的成员功能。

然后在主,new新生成的类多次,因为你需要,并开始他们每个人作为一个话题 - 确保你正在等待线程你读任何数据之前完成 - 每个线程都有自己的堆栈和类中的自己的数据,因此并行计算之间的干扰变得不太可能。

如果您想进一步优化,您需要考虑每个矩阵计算为job并创建一个与核心数量匹配的线程池,并让每个线程按顺序获取作业,这将减少上下文切换开销并且如果你的线程数量比你的核心数量多很多,CPU L1/L2缓存就会丢失 - 但是,这比现在的问题变得更加复杂。

+0

感谢您的建议。我会通过我们看看他们。顺便说一下,我确信这些矩阵不应该被修改(他们不是),因此我通过了ref。进一步优化将不得不等待,直到这被验证为工作。 – Nox

+0

我重写了代码后,您建议一切正常,并且很容易修改。非常感谢。至于性能优化,欢迎任何进一步的输入。目前我为我的系统使用'g ++'flags'-2 -msse2 -msse4.1 -mtune = core2'来计算CPU的线程数量,但严重的运行时间仍然超过一天(并且仍有代码被添加) – Nox

0

停止使用全局变量。无论如何这是糟糕的风格,在这里多个线程将同时调零并变异zy

最简单的解决方法是用worker函数中的局部变量替换全局变量 - 因此每个并发调用worker都有自己的副本 - 并通过引用将它们传递到F1F2

+0

我的想法是,这将是一个糟糕的想法b/c在这种情况下变量是矢量,因为,据我所知,所有线程共享堆这不是一个好主意。无论如何,这段代码已经经历了许多突变(为简洁起见,未在原始文章中列出)。两个变量(矢量)都是局部的(静态和非静态的),直到当前对代码修改的迭代 - 对解决方案没有任何积极影响。 – Nox

+1

在工人中使用局部变量应该没问题。在多个线程中执行大量动态堆(分配)分配可能表现不佳,因为共享堆必须同步,但正确性没有问题。无论如何,我想你是说你尝试了其他的代码,并得到同样的问题?除非您可以展示一个完整的,自包含的程序来重现这一点,否则没有人可以猜出您的不完整代码的其他版本中可能出现了什么问题。 – Useless

+0

非常感谢您的信息。现在,我已经将'z,y'声明为函数'F1,F2',并更改了这两个函数以返回'y'的副本。结果似乎与现在的预测相符。 – Nox