我想用一个昂贵的操作※使用scalaz-stream处理数据流。用chunk和zipWithIndex在scalaz-stream中令人费解的行为
scala> :paste
// Entering paste mode (ctrl-D to finish)
def expensive[T](x:T): T = {
println(s"EXPENSIVE! $x")
x
}
^D
// Exiting paste mode, now interpreting.
expensive: [T](x: T)T
※是的,是的,我知道混合代码与副作用是不好的函数式编程风格。打印语句仅用于跟踪昂贵()被调用的次数。)
在将数据传递给昂贵的操作之前,我首先需要将它分成块。
scala> val chunked: Process[Task,Vector[Int]] = Process.range(0,4).chunk(2)
chunked: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await([email protected],<function1>,Emit(SeqView(...),Halt(scalaz.stream.Process$End$)),Emit(SeqView(...),Halt(scalaz.stream.Process$End$)))
scala> chunked.runLog.run
res1: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())
然后,我将昂贵的操作映射到块的流上。
scala> val processed = chunked.map(expensive)
processed: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await([email protected],<function1>,Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)),Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)))
当我执行此,它调用昂贵的()预期的次数:
scala> processed.runLog.run
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
res2: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())
但是,如果我链zipWithIndex一个电话,昂贵的()被调用很多次:
>scala processed.zipWithIndex.runLog.run
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
res3: scala.collection.immutable.IndexedSeq[(Vector[Int], Int)] = Vector((Vector(0, 1),0), (Vector(2, 3),1), (Vector(),2))
这是一个错误?如果这是所需的行为,任何人都可以解释为什么?如果昂贵()需要很长时间,您可以看到为什么我更喜欢使用较少的调用结果。
下面是更多的例子一个要点:https://gist.github.com/underspecified/11279251
感谢您的解释和参考。在延迟中包装我昂贵的流程消除了多余的呼叫。从本质上讲,问题在于scala-stream如何混合严格和懒惰的评估,对吧?我仍然不明白的是,为什么chunk的结果在调用map之前没有完全确定。流数据是不可能的? – underspecified