直接回答您的问题:可以创建一个“将处理应用于每个项目,然后将其合并回单一值”的流。
开发您的例子有一些示例代码:
case class Item(itemId : String)
case class PurchaseOrder(orderId : String, items : Seq[Item])
val purchaseOrder : PurschaseOrder = ???
如果我们想处理与流,我们可以在项目,虽然减少的确切性质是在问题不明确,所以我不会定义如何折叠方式获得:
type ProcessOutput = ???
def processItem(item : Item) : ProcessOutput = ???
val combinedResult : Future[CombinedResult] =
Source.fromIterator(purchaseOrder.items.toIterator)
.via(Flow[Item] map processItem)
.to(Sink.fold[ProcessOutput](???)(???))
.run()
间接回答你的问题,
考虑期货首先
当背压是必要的时,阿卡流非常有用。当连接到外部数据源时背压很常见,因为bp允许您的应用程序确定数据流式传输的速度,因为您负责连续发送更多数据的需求。
在您提出问题的情况下,不需要广播需求,and incur the inherent overhead,这种沟通需要。你已经有项目的集合,所以没有人送需求...
相反,我认为期货是去为您所描述的情况下,最好的办法:
def futProcess(item : Item)(implicit ec : ExecutionContext) =
Future { processItem(item) }
// same output type as the stream run
val combinedResults : Future[CombinedResult] =
Future.sequence{ purchaseOrder.items map futProcess }
.map{ _ fold[ProcessOutput](???)(???) }
您将得到更好的性能,从一个完整的ActorSystem周围的复杂性和完全相同的结果无论如何...
谢谢你的详细答案。我也明白,在某些情况下,您指出流可能不是最理想的。但是我认为在一般情况下,我会看到很多潜力巨大的潜力,并希望检查它们。 –
@VagifAbilov欢迎您,愉快的黑客入侵。 –