2017-02-24 50 views
1

我想在Julia中实现一个简单的并行算法,用于矩阵上的乘加操作。这对于了解如何在Julia中进行并行计算而非实用性更有帮助。并行计算SharedArray

最终,这应该实现C = C + A*B其中ABC是矩阵。我使用的代码是:

function MultiplyAdd!(C::SharedArray{Int64,2}, A::SharedArray{Int64,2}, B::SharedArray{Int64,2}) 
    assert(size(A)[2]==size(B)[1]) 

    const p = size(A)[1] #rows(A), rows(C) 
    const m = size(A)[2] #cols(A), rows(B) 
    const n = size(B)[2] #cols(C), cols(B) 

    # thresh is some threshold based on the number of operations required 
    # to compute C before we should switch to parallel computation. 
    const thresh = 190 
    M = 2*p*m*n 

    if (M < thresh) 
     C += A*B # base case for recursion 
    elseif (n >= max(p,m)) 
    # Partition C and B (Vertical Split) 
      if (iseven(p)) 
       #even partition of C and B 
       #MultiplyAdd!(C0, A, B0) 
       #MultiplyAdd!(C1, A, B1) 
       a = @spawn MultiplyAdd!(C[:,1:convert(Int64,p/2)], A, B[:,1:convert(Int64,p/2)]) 
       b = @spawn MultiplyAdd!(C[:,1:convert(Int64,p-(p/2))], A, B[:,1:convert(Int64,p-(p/2))]) 
       fetch(a), fetch(b) 
      else 
       #odd parition of C and B 
       a = @spawn MultiplyAdd!(C[:,1:convert(Int64,p/2+0.5)], A, B[:,1:convert(Int64,p/2+0.5)]) 
       b = @spawn MultiplyAdd!(C[:,1:convert(Int64,p-(p/2+0.5))], A, B[:,1:convert(Int64,p-(p/2+0.5))]) 
       fetch(a), fetch(b) 
      end 

    elseif (p >= m) 
    # Partition C and A (Horizontal Split) 
      if (iseven(n)) 
       #even partition of C and A 
       #MultiplyAdd!(C0, A0, B) 
       #MultiplyAdd!(C1, A1, B) 
       a = @spawn MultiplyAdd!(C[1:convert(Int64,n/2),:],A[1:convert(Int64,n/2),:],B) 
       b = @spawn MultiplyAdd!(C1 = C[1:convert(Int64,n-(n/2)),:], A[1:convert(Int64,n-(n/2)),:], B) 
       fetch(a), fetch(b) 
      else 
       #odd parition of C and A 
       # MultiplAdd!(C0, A0, B) 
       # MultiplyAdd!(C1, A1, B) 
       a = @spawn MultiplyAdd!(C[1:convert(Int64,n/2 + 0.5),:], A[1:convert(Int64,n/2),:], B) 
       b = @spawn MultiplyAdd!(C[1:convert(Int64,n-(n/2 + 0.5)),:], A[1:convert(Int64,n-(n/2 + 0.5)),:], B) 
       fetch(a), fetch(b) 
      end 
    else 
     #Begin Serial Recursion 
     # A is Vertical Split, B is Horizontal Split 
     #MultiplyAdd!(C,A0,B0) 
     #MultiplyAdd!(C,A1,B1) 
     if (iseven(m)) 
      MultiplyAdd!(C,A[:,1:convert(Int64,m/2)], B[1:convert(Int64,m/2),:]) 
      MultiplyAdd!(C,A[:,1:convert(Int64,m-(m/2))], B[1:convert(Int64,m-(m/2)),:]) 
     else 
      MultiplyAdd!(C,A[:,1:convert(Int64,m/2 + 0.5)], B[1:convert(Int64,m/2 + 0.5), :]) 
      MultiplyAdd!(C,A[:,1:convert(Int64,m-(m/2 + 0.5))], B[1:convert(Int64,m-(m/2 + 0.5)), :]) 
     end  

    end 
end 

首先,这并不在所有的工作。我运行它时出现以下错误。

LoadError: On worker 5: 
UndefVarError: #MultiplyAdd! not defined 
in deserialize_datatype at ./serialize.jl:823 
in handle_deserialize at ./serialize.jl:571 
in collect at ./array.jl:307 
in deserialize at ./serialize.jl:588 
in handle_deserialize at ./serialize.jl:581 
in deserialize at ./serialize.jl:541 
in deserialize_datatype at ./serialize.jl:829 
in handle_deserialize at ./serialize.jl:571 
in deserialize_msg at ./multi.jl:120 
in message_handler_loop at ./multi.jl:1317 
in process_tcp_streams at ./multi.jl:1276 
in #618 at ./event.jl:68 
while loading In[32], in expression starting on line 73 

in #remotecall_fetch#606(::Array{Any,1}, ::Function, ::Function, ::Base.Worker, ::Base.RRID, ::Vararg{Any,N}) at ./multi.jl:1070 
in remotecall_fetch(::Function, ::Base.Worker, ::Base.RRID, ::Vararg{Any,N}) at ./multi.jl:1062 
in #remotecall_fetch#609(::Array{Any,1}, ::Function, ::Function, ::Int64, ::Base.RRID, ::Vararg{Any,N}) at ./multi.jl:1080 
in remotecall_fetch(::Function, ::Int64, ::Base.RRID, ::Vararg{Any,N}) at ./multi.jl:1080 
in call_on_owner(::Function, ::Future, ::Int64, ::Vararg{Int64,N}) at ./multi.jl:1130 
in fetch(::Future) at ./multi.jl:1156 
in MultiplyAdd!(::SharedArray{Int64,2}, ::SharedArray{Int64,2}, ::SharedArray{Int64,2}) at ./In[32]:24 

其次,它似乎对我来说,我不应该运行两个@spawn任务。我应该让第二个在这些情况下都是MultiplyAdd!(C,A,B)调用。换句话说,只需分配afetch(a)

三,Julia通过引用将数组传递给函数,所以并不是所有的操作都自然地在相同的C,AB矩阵上运行?既然这样,采取切片如:

C0 = C[:, 1:p/2] 
C1 = C[:, 1:p-p/2] 

创建一个全新的对象,这解释了服用在上面的代码中的函数调用的内部的片。实质上,我避免了赋值尝试并始终对同一个对象进行操作。必须有比我实施的更好的方法来做到这一点。最终,我想要对内存中的相同数据进行操作,并将“在阵列上移动放大镜”以并行操作其子集。

+3

在函数定义之前放置'@ everywhere',这样函数就可以在每个进程中定义。另外,你可能想看看在基本情况下使用'muladd'。 –

+0

我昨天问的问题http://stackoverflow.com/questions/42422209/why-is-constness-not-respected-inside-these-julia-functions在这里似乎也是相关的 –

+0

'@ everywhere'固定那个错误。之后出现第二个错误,因为混合阵列和SharedArrays。我预先在递归函数调用中将'转换(SharedArray')为每一个片,现在它运行但是给出了错误的答案,尽管至少它完成了计算,我将检查片的逻辑以了解为什么是这种情况 – xave

回答

1

在这里很难帮助你,因为你没有真正问过问题。冒着听起来居高临下的风险,我建议你偷看suggestions问一个好的SO问题。

至于你的代码,你的方法有几个问题。

  1. 作为编码,MultiplyAdd!只能并行化到最多两个工人。
  2. MultiplyAdd!执行几个呼叫,如A[:,1:n]分配新阵列。
  3. 本着这种精神,呼吁像A[:,1:n]将使Array对象,而不是SharedArray对象,所以递归调用MultiplyAdd!与严格的类型来SharedArray{Int,2}参数将无法正常工作。
  4. 最后,MultiplyAdd!不尊重SharedArray对象的索引方案。

最后一点很重要。要求工作人员1访问分配给工作人员2的AB的部分需要数据传输,这会消除并行化代码的任何性能收益。你可以通过你的SharedArray对象A上运行

for i in procs(A) 
    @show i, localindexes(A) 
end 

看到这一点。每个工作人员i理想情况下应该只在自己的本地索引上操作,尽管在本地索引边界允许数据共享可以帮助您节省一些簿记头痛的问题。

如果你坚持使用SharedArray作为你的原型,那么你仍然有选择。 SharedArraydocumentation有一些很好的建议。我发现,构建

function f(g, S::SharedArray) 
    @sync begin 
     for p in procs(S) 
      @async begin 
       remotecall_fetch(g, p, S, p) 
      end 
     end 
    end 
    S 
end 

一些内核函数g(例如MultiplyAdd!)通常会在所有参与的工作人员并行操作很好。显然你必须决定如何分割执行;文档中的advection_shared!示例是一个很好的指导。

您也可以考虑使用Julia的native multithreading。该并行框架与共享内存计算有点不同。但是,它允许您使用熟悉的迭代构造直接操作Array对象。

+0

最终,我并不是在寻找性能,我只是想在Julia中并行实现一个算法来理解这个过程,它纯粹是一个教育练习,这个算法非常简单,如果我不能做到这一点,我不能做任何不平凡的事情,我不需要使用SharedArray,我只需要设置代码,使其按预期工作,这是一个Multiply的并行实现 - 加入Julia我们完全可以如有必要,可以使用所有代码。 – xave