我有一个Iterable
需要执行的“工作单元”,没有特定的顺序,并且可以很容易地并行运行而不会相互干扰。斯卡拉并行无序迭代器
不幸的是,一次运行太多的内存将超过我的可用内存,所以我需要确保在任何给定的时间只有少数人同时运行。
在最基本的,我想这种类型签名的功能:
parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B]
使得输出Iterator
不一定是相同的顺序输入(如果我想保持的地方知识结果来了,我可以用输入或其他东西来输出一对)。然后,消费者可以增量使用生成的迭代器,而不会耗尽机器的所有内存,同时保持尽可能多的并行性。
此外,我希望函数尽可能高效。最初的想法我是例如做一些事情大致如下:
xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator)
,我希望在toSet
会通知Scala的并行集合,它可以尽快为他们准备好开始生产从它的迭代器的元素,以任何顺序,grouped
电话是限制同时工的人数。不幸的是,它看起来并不像toSet
调用达到预期的效果(结果按照它们在没有par
调用的情况下以相同顺序返回,在我的实验中),grouped
调用并不理想。例如,如果我们的组大小为100,并且这些作业中的99个立即在十几个核心上完成,但其中一个特别慢,剩下的大多数核心将闲置,直到我们可以移动到下一个组。有一个“自适应窗口”最多与我的块大小一样大,但不会受到慢速工人的阻碍。
我可以想像我自己写了一个类似于偷工排队的东西,但是我认为处理并发原语的很多努力已经完成在Scala的并行集合库中。有谁知道我可以重复使用哪些部分来构建这些功能,或者对如何实现这样的操作有其他建议?
没有重排序支持,如果特定元素需要一段时间才能计算,那么事情是否会停止? –
取决于你的意思是'将事情停止' - 其他元素将保持计算,但并行计算启动的线程将阻塞,直到所有完成为止,因此直到所有内容完成后才能访问它们。因此,如果迭代器中的第一个元素需要时间,那么不会阻止最后一个被计算,但它会阻止您访问它。 – Impredicative
那么,主要问题之一是内存使用情况,所以我确实希望它停止计算最后一个元素。乱序结果实际上只是一种优化,可以减少这种“等待”。 –