2013-02-06 215 views
3

我有一个Iterable需要执行的“工作单元”,没有特定的顺序,并且可以很容易地并行运行而不会相互干扰。斯卡拉并行无序迭代器

不幸的是,一次运行太多的内​​存将超过我的可用内存,所以我需要确保在任何给定的时间只有少数人同时运行。

在最基本的,我想这种类型签名的功能:

parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B] 

使得输出Iterator不一定是相同的顺序输入(如果我想保持的地方知识结果来了,我可以用输入或其他东西来输出一对)。然后,消费者可以增量使用生成的迭代器,而不会耗尽机器的所有内存,同时保持尽可能多的并行性。

此外,我希望函数尽可能高效。最初的想法我是例如做一些事情大致如下:

xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator) 

,我希望在toSet会通知Scala的并行集合,它可以尽快为他们准备好开始生产从它的迭代器的元素,以任何顺序,grouped电话是限制同时工的人数。不幸的是,它看起来并不像toSet调用达到预期的效果(结果按照它们在没有par调用的情况下以相同顺序返回,在我的实验中),grouped调用并不理想。例如,如果我们的组大小为100,并且这些作业中的99个立即在十几个核心上完成,但其中一个特别慢,剩下的大多数核心将闲置,直到我们可以移动到下一个组。有一个“自适应窗口”最多与我的块大小一样大,但不会受到慢速工人的阻碍。

我可以想像我自己写了一个类似于偷工排队的东西,但是我认为处理并发原语的很多努力已经完成在Scala的并行集合库中。有谁知道我可以重复使用哪些部分来构建这些功能,或者对如何实现这样的操作有其他建议?

回答

3

并行集合框架允许您指定要用于给定任务的最大线程数。使用Scala的2.10,你想做的事:

def parMap[A,B](x : Iterable[A], f : A => B, chunkSize : Int) = { 
    val px = x.par 
    px.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(chunkSize)) 
    px map f 
} 

这将避免在任何时间运行多个chunkSize操作。这使用下面的工作窃取策略来保持参与者的工作,因此不会遇到与上面的grouped示例相同的问题。

但是,这样做不会将结果重新排序为首次完成的顺序。为此,我会建议将您的操作转换为演员,并有一个小型演员池执行操作,然后在完成后将结果发回给您。

+0

没有重排序支持,如果特定元素需要一段时间才能计算,那么事情是否会停止? –

+0

取决于你的意思是'将事情停止' - 其他元素将保持计算,但并行计算启动的线程将阻塞,直到所有完成为止,因此直到所有内容完成后才能访问它们。因此,如果迭代器中的第一个元素需要时间,那么不会阻止最后一个被计算,但它会阻止您访问它。 – Impredicative

+0

那么,主要问题之一是内存使用情况,所以我确实希望它停止计算最后一个元素。乱序结果实际上只是一种优化,可以减少这种“等待”。 –