2011-09-29 27 views
5

在编程二郎本书的一章“编程多核CPU”,乔·阿姆斯特朗给出了一个地图功能的并行化的一个很好的例子:如何优化Erlang中数千条消息的接收循环?

pmap(F, L) -> 
    S = self(), 
    %% make_ref() returns a unique reference 
    %% we'll match on this later 
    Ref = erlang:make_ref(), 
    Pids = map(fun(I) -> 
     spawn(fun() -> do_f(S, Ref, F, I) end) 
    end, L), 
    %% gather the results 
    gather(Pids, Ref). 

do_f(Parent, Ref, F, I) -> 
    Parent ! {self(), Ref, (catch F(I))}. 

gather([Pid|T], Ref) -> 
    receive 
     {Pid, Ref, Ret} -> [Ret|gather(T, Ref)] 
    end; 

gather([], _) -> 
    []. 

它工作得很好,但我相信这是它的一个瓶颈造成它在超过100,000个元素的列表上工作的速度非常慢。

当执行gather()函数时,它将开始将Pids列表中的第一个Pid与主进程邮箱中的邮件进行匹配。但是如果邮箱中最旧的邮件不是来自这个Pid?然后它会尝试所有其他消息直到找到匹配。也就是说,在执行gather()函数时,我们必须遍历所有邮箱消息才能找到与我们从Pids列表中取得的Pid匹配的一个概率。这是N * N最坏的情况下尺寸为N的列表

我甚至设法证明这个瓶颈的存在:

gather([Pid|T], Ref) -> 
    receive 
     {Pid, Ref, Ret} -> [Ret|gather(T, Ref)]; 
     %% Here it is: 
     Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid]) 
    end; 

我怎样才能避免这种瓶颈?

+0

似乎是一个非常简单的问题,惊讶还是有对此没有很好的答案。 – cnst

回答

1

在这种情况下,您可以使用dict(从衍生进程的pid到原始列表中的索引)替代为Pids

+0

您链接到设置手动,但文中说这是字典。它应该是哪一个? – gleber

+0

问题是连接其在初始列表参数每个答案。如果你使用'dict',那么这很容易。否则,订单就会变得更加困难。 – rvirding

+0

@gleber:固定。我本来有套,然后意识到你需要保持索引。 –

3

的问题是,如果你想有一个正确的解决方案,你仍然要:

  • 检查,如果给定的回复来自你的方法之一 催生
  • 确保正确的结果为了

这是一个使用计数器而不是列表的解决方案 - 这消除了多次遍历收件箱的必要性。 Ref的匹配确保我们收到的消息来自我们的孩子。通过在pmap的末尾对lists:keysort/2排序结果来确保正确的订单,这会增加一些开销,但可能会小于O(n^2)

-module(test). 

-compile(export_all). 

pmap(F, L) -> 
    S = self(), 
    % make_ref() returns a unique reference 
    % we'll match on this later 
    Ref = erlang:make_ref(), 
    Count = lists:foldl(fun(I, C) -> 
           spawn(fun() -> 
               do_f(C, S, Ref, F, I) 
             end), 
           C+1 
         end, 0, L), 
    % gather the results 
    Res = gather(0, Count, Ref), 
    % reorder the results 
    element(2, lists:unzip(lists:keysort(1, Res))). 


do_f(C, Parent, Ref, F, I) -> 
    Parent ! {C, Ref, (catch F(I))}. 


gather(C, C, _) -> 
    []; 
gather(C, Count, Ref) -> 
    receive 
     {C, Ref, Ret} -> [{C, Ret}|gather(C+1, Count, Ref)] 
    end. 
+0

它使用'名单:foldl',而不是'map',你可能还没有实现自己。看一看在'人lists'或在书它的定义/实现(我相信它的存在)。 – gleber

2

乔的例子很整洁,但实际上你想要一个更重量级的解决方案来解决你的问题。以http://code.google.com/p/plists/source/browse/trunk/src/plists.erl为例。

在一般情况下,有你想要做的三件事情:

  1. 选择一个工作单位是“足够大”。如果工作单元太小,则会因处理开销而死亡。如果它太大,你会因为工人闲置而死亡,特别是如果你的工作没有被列表中的元素数量平均分配。

  2. 同时工的数量上限。 Psyeugenic提议通过调度程序来分割它,我建议按工作计数限制分割它,100个工作岗位说。也就是说,你想开始100个工作,然后等到其中一些完成后再开始更多的工作。

  3. 考虑拧紧元素的顺序,如果可能的话。如果您不需要考虑订单,速度会快得多。对于许多问题,这是可能的。如果订单事情的话,那么使用dict的东西存储在提议。大元素列表的速度更快。

基本规则是,只要想要并行,就很少需要基于列表的数据表示。该列表具有固有的线性,您不需要。上有很专题演讲由Guy Steele的:http://vimeo.com/6624203