2015-09-15 19 views
11

我想在大列表上做一个并行地图​​。代码看起来有点像这样:Elixir Stream中的Task.async

big_list 
|> Stream.map(&Task.async(Module, :do_something, [&1])) 
|> Stream.map(&Task.await(&1)) 
|> Enum.filter filter_fun 

但我检查流实现而据我了解Stream.map结合了功能和应用组合功能的流,这将意味着顺序是这样的内容:

  1. 以第一要素
  2. 创建异步任务
  3. 等待它完成
  4. 采取第二elelemnt ...

在这种情况下,它不会做并行。我是对的还是缺少什​​么?

如果我是正确的,那这个代码?

Stream.map Task.async ... 
|> Enum.map Task.await ... 

这是否会平行运行?

+2

阅读本 - http://www.theerlangelist.com/2015/07/beyond-taskasync.html – emaillenin

回答

9

第二个也不会做你想做的。您可以使用此代码看得很清楚:

defmodule Test do 
    def test do 
    [1,2,3] 
    |> Stream.map(&Task.async(Test, :job, [&1])) 
    |> Enum.map(&Task.await(&1)) 
    end 

    def job(number) do 
    :timer.sleep 1000 
    IO.inspect(number) 
    end 
end 

Test.test 

你会看到一个数字,然后1周秒钟的等待,另一个号码,等等。这里的关键是你想尽快创建任务,所以你根本不应该使用 懒惰Stream.map。而是使用急于Enum.map在这一点上:

|> Enum.map(&Task.async(Test, :job, [&1])) 
|> Enum.map(&Task.await(&1)) 

在另一方面等待当你做一些急于操作后,像你filter可以使用Stream.map,只要。这样,等待将会穿插任何你可能对结果进行的处理。

4

药剂1.4提供了新的Task.async_stream/5函数将返回在可枚举的每一个项目同时运行一个给定函数的流。

还可以使用:max_concurrency:timeout选项参数指定工作人员的最大数量和超时时间。


这会让你的例子同时运行:

big_list 
|> Task.async_stream(Module, :do_something, [&1]) 
|> Enum.filter(filter_fun) 
0

你可以试试Parallel Stream

stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end) 
stream |> Enum.into([]) 
[2,4,6,8,10,12,14,16,18,20] 

UPD 或者更好地利用Flow