2014-04-25 48 views
2

我想用一个昂贵的操作※使用scalaz-stream处理数据流。用chunk和zipWithIndex在scalaz-stream中令人费解的行为

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

    def expensive[T](x:T): T = { 
     println(s"EXPENSIVE! $x") 
     x 
    } 
    ^D 
// Exiting paste mode, now interpreting. 

expensive: [T](x: T)T 

※是的,是的,我知道混合代码与副作用是不好的函数式编程风格。打印语句仅用于跟踪昂贵()被调用的次数。)

在将数据传递给昂贵的操作之前,我首先需要将它分成块。

scala> val chunked: Process[Task,Vector[Int]] = Process.range(0,4).chunk(2) 
chunked: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await([email protected],<function1>,Emit(SeqView(...),Halt(scalaz.stream.Process$End$)),Emit(SeqView(...),Halt(scalaz.stream.Process$End$))) 

scala> chunked.runLog.run 
res1: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector()) 

然后,我将昂贵的操作映射到块的流上。

scala> val processed = chunked.map(expensive) 
processed: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await([email protected],<function1>,Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)),Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$))) 

当我执行此,它调用昂贵的()预期的次数:

scala> processed.runLog.run 
EXPENSIVE! Vector(0, 1) 
EXPENSIVE! Vector(2, 3) 
EXPENSIVE! Vector() 
res2: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector()) 

但是,如果我链zipWithIndex一个电话,昂贵的()被调用很多次:

>scala processed.zipWithIndex.runLog.run 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector(0) 
EXPENSIVE! Vector(0) 
EXPENSIVE! Vector(0) 
EXPENSIVE! Vector(0) 
EXPENSIVE! Vector(0, 1) 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector(2) 
EXPENSIVE! Vector(2) 
EXPENSIVE! Vector(2) 
EXPENSIVE! Vector(2) 
EXPENSIVE! Vector(2, 3) 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
res3: scala.collection.immutable.IndexedSeq[(Vector[Int], Int)] = Vector((Vector(0, 1),0), (Vector(2, 3),1), (Vector(),2)) 

这是一个错误?如果这是所需的行为,任何人都可以解释为什么?如果昂贵()需要很长时间,您可以看到为什么我更喜欢使用较少的调用结果。

下面是更多的例子一个要点:https://gist.github.com/underspecified/11279251

回答

2

你看到这个issue,可以采取一些different forms。问题实质上是,map可以看到(并做些什么)chunk正在建立其结果的中间步骤。

此行为may change in the future,但在此期间有一些可能的解决方法。最简单的一种是包装您昂贵的功能在处理和使用的flatMap代替map

chunked.flatMap(a => 
    Process.eval(Task.delay(expensive(a))) 
).zipWithIndex.runLog.run 

另一种解决方案是包装您昂贵的功能在effectful道:

def expensiveChannel[A] = Process.constant((a: A) => Task.delay(expensive(a))) 

现在你可以使用through

chunked.through(expensiveChannel).zipWithIndex.runLog.run 

虽然目前的行为可能有点令人惊讶,但它也是一个很好的提醒t您应该使用类型系统来帮助您跟踪全部您关心的效果(以及长时间运行的计算可以是其中之一)。

+0

感谢您的解释和参考。在延迟中包装我昂贵的流程消除了多余的呼叫。从本质上讲,问题在于scala-stream如何混合严格和懒惰的评估,对吧?我仍然不明白的是,为什么chunk的结果在调用map之前没有完全确定。流数据是不可能的? – underspecified