2011-08-06 113 views
9

我对基于异步的程序中的堆栈溢出感到惊讶。我怀疑的主要问题是用下面的函数,这是为了构成2个异步计算并行执行,并等待这两个完成:F#异步堆栈溢出

let (<|>) (a: Async<unit>) (b: Async<unit>) = 
    async { 
     let! x = Async.StartChild a 
     let! y = Async.StartChild b 
     do! x 
     do! y 
    } 

利用该定义,我有以下mapReduce程序试图利用mapreduce部分的并行性。非正式地,这个想法是使用一个共享通道激发N映射器和N-1 reducer,等待它们完成并从通道读取结果。我有我自己的Channel实施,由ConcurrentBag这里更换为更短的代码(该问题影响):

let mapReduce (map : 'T1 -> Async<'T2>) 
       (reduce : 'T2 -> 'T2 -> Async<'T2>) 
       (input : seq<'T1>) : Async<'T2> = 
    let bag = System.Collections.Concurrent.ConcurrentBag() 

    let rec read() = 
     async { 
      match bag.TryTake() with 
      | true, value -> return value 
      | _   -> do! Async.Sleep 100 
          return! read() 
     } 

    let write x = 
     bag.Add x 
     async.Return() 

    let reducer = 
     async { 
      let! x = read() 
      let! y = read() 
      let! r = reduce x y 
      return bag.Add r 
     } 

    let work = 
     input 
     |> Seq.map (fun x -> async.Bind(map x, write)) 
     |> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer) 

    async { 
     do! work 
     return! read() 
    } 

现在以下基本测试开始抛出StackOverflowException上N = 10000:

let test n = 
    let map x  = async.Return x 
    let reduce x y = async.Return (x + y) 
    mapReduce map reduce [0..n] 
    |> Async.RunSynchronously 

编辑:另一种实施<|>组合子使得对N = 10000测试成功:

let (<|>) (a: Async<unit>) (b: Async<unit>) = 
    Async.FromContinuations(fun (ok, _, _) -> 
    let count = ref 0 
    let ok() = 
     lock count (fun() -> 
      match !count with 
      | 0 -> incr count 
      | _ -> ok()) 
    Async.Start <| 
     async { 
      do! a 
      return ok() 
     } 
    Async.Start <| 
     async { 
      do! b 
      return ok() 
     }) 

这为r这对我来说是令人惊讶的,因为这是我所假设的Async.StartChild正在做的事情。任何想法哪个解决方案是最优的?

回答

4

我认为在启动使用<|>运算符创建的异步工作流时会发生堆栈溢出异常。到Async.StartChild呼叫开始第一工作流,这是使用<|>合并,并因此它是另一个调用Async.StartChild

一种简单的方法来解决它是安排在一个定时器的处理程序工作流(使得它没有被添加到当前堆栈)。喜欢的东西:

let (<|>) (a: Async<unit>) (b: Async<unit>) = 
    async { 
     do! Async.Sleep 1 
     let! x = Async.StartChild a 
     let! y = Async.StartChild b 
     do! x 
     do! y } 

一个更好的办法来解决它会创建自己的Seq.reduce - 当前实现折叠它一个接一个,所以你会得到深度10000的树,只包含一个单一的右侧的工作项目以及左侧的所有其他工作项目。如果你创建了一个工作项目的平衡二叉树,那么它不应该叠加,因为高度只有15左右。

编辑尝试用下面的函数替换Seq.reduce

module Seq = 
    let reduceBallanced f input = 
    let arr = input |> Array.ofSeq 
    let rec reduce s t = 
     if s + 1 >= t then arr.[s] 
     else 
     let m = (s + t)/2 
     f (reduce s m) (reduce m t) 
    reduce 0 arr.Length 
+0

使用'Async.Sleep 1'使代码慢得多。尽管'map'和'reduce'函数实际上做了一些有用的工作时可能不会那么明显,但这需要一些时间。 – svick

+0

其实,我现在不在乎时间/空间的复杂性 - 我真的很惊讶代码使用堆栈!如果结构堆在一起,这对n = 10K就可以。 – t0yv0

+0

@toyvo - 那么,启动工作流时会发生堆栈溢出,这显然会使用堆栈。运行工作流程不需要堆栈。 'Sleep 1'解决方法只是为了证明这确实是问题所在 - 使用构建平衡二叉树的Seq.reduce应该可以解决问题而不会增加开销。 –

2

我相信托马斯得到正确的答案的直觉,但这里是我自己的话,更详细的,花费相当多的后时间来解决这个问题。

  1. 问题是由于过度同步,上述代码没有实现预期的mapReduce算法。特别是,在ab都完成之前,a <|> b <|> c未启动c,因此实际上<|>对于具有两个以上计算的并行性是无用的。

  2. 第二个问题是async.Return xAsync.FromContinuations(fun (ok, _, _) -> ok x)同构。这个例子实际上是在单线程上按顺序执行的,并且分配的闭包使堆栈弹出。下面

对于感兴趣的读者,是我的设计该算法,这似乎票价更好一点(〜1秒上n=100000和〜21地图秒上n=100000和减少功能扩展用Async.Sleep 1000秒尝试,我有Core i3)。

let mapReduce (map : 'T1 -> Async<'T2>) 
       (reduce : 'T2 -> 'T2 -> Async<'T2>) 
       (input : seq<'T1>) : Async<'T2> = 
    let run (a: Async<'T>) (k: 'T -> unit) = 
     Async.StartWithContinuations(a, k, ignore, ignore) 
    Async.FromContinuations <| fun (ok, _, _) -> 
     let k = ref 0 
     let agent = 
      new MailboxProcessor<_>(fun chan -> 
       async { 
        for i in 2 .. k.Value do 
         let! x = chan.Receive() 
         let! y = chan.Receive() 
         return run (reduce x y) chan.Post 
        let! r = chan.Receive() 
        return ok r 
       }) 
     k := 
      (0, input) 
      ||> Seq.fold (fun count x -> 
       run (map x) agent.Post 
       count + 1) 
     agent.Start() 
0

另一个,简单的实现可以是这样的:

let mapReduce' (map : 'T1 -> Async<'T2>) 
       (reduce : 'T2 -> 'T2 -> Async<'T2>) 
       (input : seq<'T1>) : Async<'T2> = 
     async { 
      let! r = input |> Seq.map map |> Async.Parallel 
      return r |> Array.toSeq 
        |> Seq.reduce (fun a b -> reduce a b |> Async.RunSynchronously) 

     } 

在该地图相是并行执行的,然后减少相位是连续的,因为它有前一计算值数据依赖性。

+0

正如你指出的那样,这里减少不能交错或重新排序,所以它不一样。 – t0yv0

2

非常有趣的讨论! 我有一个类似的问题与Async.Parallel

let (<||>) first second = async { let! results = Async.Parallel([|first; second|]) in return (results.[0], results.[1]) } 

let test = async { do! Async.Sleep 100 } 
(test, [1..10000]) 
||> List.fold (fun state value -> (test <||> state) |> Async.Ignore) 
|> Async.RunSynchronously // stackoverflow 

我感到非常沮丧,所以,我解决它通过创建我自己的并行组合子。

let parallel<'T>(computations : Async<'T> []) : Async<'T []> = 
    Async.FromContinuations (fun (cont, exnCont, _) -> 
    let count = ref computations.Length 
    let results : 'T [] = Array.zeroCreate computations.Length 
    computations 
     |> Array.iteri (fun i computation -> 
      Async.Start <| 
       async { 
        try 
         let! res = computation 
         results.[i] <- res 
        with ex -> exnCont ex 

        let n = System.Threading.Interlocked.Decrement(count) 
        if n = 0 then 
         results |> cont 
       })) 

最后通过讨论启发,我实现了以下功能的MapReduce

// (|f ,⊗|) 

let mapReduce (mapF : 'T -> Async<'R>) (reduceF : 'R -> 'R -> Async<'R>) (input : 'T []) : Async<'R> = 
let rec mapReduce' s e = 
    async { 
     if s + 1 >= e then return! mapF input.[s] 
     else 
      let m = (s + e)/2 
      let! (left, right) = mapReduce' s m <||> mapReduce' m e 
      return! reduceF left right 
    } 
mapReduce' 0 input.Length 
+1

太棒了!我非常喜欢这个。我想知道托马斯是否在他的评论中考虑了这个解决方案。我认为这与我自己的答案之间的区别在于,您使用的是严格的裁减计划,而我正在使用先到先得的服务。我可以想象,对于一些输入,我的解决方案会得到更好的降低排序,但对于大多数输入而言,由于协调开销会较慢。我可以扮演魔鬼的倡导者,并在我的解决方案获胜的情况下建立输入(使用许多快速地图,一个非常缓慢的地图,并缓慢减少):https://gist.github.com/1131917 – t0yv0

+0

一个重要的想法是代数上我的解决方案需要还原函数必须服从结合律。 (检查列表同态) –

+0

当然,我的解决方案还需要减少关联。然而,你并没有充分利用它提供的自由,有时候会减少等待时间。 – t0yv0