我对基于异步的程序中的堆栈溢出感到惊讶。我怀疑的主要问题是用下面的函数,这是为了构成2个异步计算并行执行,并等待这两个完成:F#异步堆栈溢出
let (<|>) (a: Async<unit>) (b: Async<unit>) =
async {
let! x = Async.StartChild a
let! y = Async.StartChild b
do! x
do! y
}
利用该定义,我有以下mapReduce
程序试图利用map
和reduce
部分的并行性。非正式地,这个想法是使用一个共享通道激发N
映射器和N-1
reducer,等待它们完成并从通道读取结果。我有我自己的Channel
实施,由ConcurrentBag
这里更换为更短的代码(该问题影响):
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let bag = System.Collections.Concurrent.ConcurrentBag()
let rec read() =
async {
match bag.TryTake() with
| true, value -> return value
| _ -> do! Async.Sleep 100
return! read()
}
let write x =
bag.Add x
async.Return()
let reducer =
async {
let! x = read()
let! y = read()
let! r = reduce x y
return bag.Add r
}
let work =
input
|> Seq.map (fun x -> async.Bind(map x, write))
|> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer)
async {
do! work
return! read()
}
现在以下基本测试开始抛出StackOverflowException上N = 10000:
let test n =
let map x = async.Return x
let reduce x y = async.Return (x + y)
mapReduce map reduce [0..n]
|> Async.RunSynchronously
编辑:另一种实施<|>
组合子使得对N = 10000测试成功:
let (<|>) (a: Async<unit>) (b: Async<unit>) =
Async.FromContinuations(fun (ok, _, _) ->
let count = ref 0
let ok() =
lock count (fun() ->
match !count with
| 0 -> incr count
| _ -> ok())
Async.Start <|
async {
do! a
return ok()
}
Async.Start <|
async {
do! b
return ok()
})
这为r这对我来说是令人惊讶的,因为这是我所假设的Async.StartChild
正在做的事情。任何想法哪个解决方案是最优的?
使用'Async.Sleep 1'使代码慢得多。尽管'map'和'reduce'函数实际上做了一些有用的工作时可能不会那么明显,但这需要一些时间。 – svick
其实,我现在不在乎时间/空间的复杂性 - 我真的很惊讶代码使用堆栈!如果结构堆在一起,这对n = 10K就可以。 – t0yv0
@toyvo - 那么,启动工作流时会发生堆栈溢出,这显然会使用堆栈。运行工作流程不需要堆栈。 'Sleep 1'解决方法只是为了证明这确实是问题所在 - 使用构建平衡二叉树的Seq.reduce应该可以解决问题而不会增加开销。 –