2013-06-18 31 views
12

是否有可能使用Scala的并行集合来并行化Iterator而不是事先完全评估它?Scala中的并行迭代器

这里我说的是在Iterator上并行化函数转换,即mapflatMap。 我认为这需要预先评估Iterator的某些元素,然后计算更多,一旦有一些元素通过next消耗。

我能找到的所有内容都需要将迭代器最多转换为IterableStream。当我呼叫.par时,Stream会得到完全评估。

我也欢迎实施建议,如果这不是现成的。实施应支持并行mapflatMap

+0

答案是_probably no_,但是你可以多说一点你想从中得到什么吗?特别是,计算何时应该开始运行 - 在创建迭代器之后,还是一旦你调用了一些强制评估的东西? –

+0

@RexKerr看起来像一个设计选择;但让它在第一次请求时开始,使第一个请求变得特别。我目前正试图实现这样的事情,并且我选择立即开始运行并存储下一个“n”结果。一旦消耗完了,我计算一个替代品。 – ziggystar

回答

3

你最好是用标准库的赌注很可能无法使用并行收集,但concurrent.Future.traverse

import concurrent._ 
import ExecutionContext.Implicits.global 
Future.traverse(Iterator(1,2,3))(i => Future{ i*i }) 

但我认为这将执行,只要它可以在启动整个事情。并联

1

从ML,遍历迭代元素:

https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ

我类似的原因移出Future.traverse。对于我的使用案例,保持N个工作正常工作,我最终用代码来限制从作业队列中提供执行上下文。

我的第一次尝试涉及阻塞馈线线程,但冒着也冒着想要在执行上下文上产生任务的任务的风险。你知道什么,阻止是邪恶的。

+0

你可以评论你为什么使用'(NUM_CPU + 1)^ 2'作为阻塞队列的大小吗? – ziggystar

+0

另外我发现了一个难题:1.我并不擅长并发编程2.“flatMap”更难。 – ziggystar

+0

@ziggystar“你”的意思是ML上的“Juha”。我不认为这是一个神奇的数字:足够大以至于消费者不会超越原始迭代器(也许可能做I/O)加映射函数(CPU限制,他说,但长或短运行?)。我看到未来喂养队列会阻止,而不会调用“阻塞”;也许+1是从“期望的并行性”中遗留下来的。我的解决方案在管道检查结束时检查了更多的工作,即工作要做的最后一件事就是检查是否有足够的工作正在进行,如果没有,则喂养野兽。我同意这很难,简单是关键。 –

0

这是一个有点难以完全地按照你以后,但也许是这样的:

val f = (x: Int) => x + 1 
val s = (0 to 9).toStream map f splitAt(6) match { 
    case (left, right) => left.par; right 
} 

这将eveluate在平行前6件F,然后在剩下的返回流。

+0

这似乎并不是平行运行 - 您是否需要将'map f'移动到'par'之后? – DNA

6

我意识到这是一个老问题,但iterata库中的ParIterator实现是否按照您的要求进行操作?

scala> import com.timgroup.iterata.ParIterator.Implicits._ 
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId)) 
scala> it.map(_._2).toSet.size 
res2: Int = 8 // addition was distributed over 8 threads 
+1

它解决了这个问题。不过,这可能会更有效一些,因为如果在一个块内的操作的运行时间有很大的变化,那么会阻塞很多。 – ziggystar

+0

@ziggystar如何更有效率? –

+0

'ParIterator'将'Iterator'分割成块。所以如果你有小块(例如大小为2),一个元素需要1s,另一个需要10s,那么你的并行化不好。一旦工人变得空闲,一个不同的实现可以向工作人员提供来自迭代器的新元素。 – ziggystar