2016-11-12 59 views
2

我仍然在掌握Akka流概念,并试图了解如何将它们映射到场景中,以便我们收集需要以原子方式处理的项目集合。假设我们有一个由多个项目组成的采购订单,我们需要对每个项目应用一些处理,然后将其合并回单一值。此类工作流程是否应成为自己的单独流(或子流),一旦采购订单得到全面处理,该流(或子流)就会关闭?即每个采购订单开始一个新的流?或者我有一系列永不停止的采购订单?但如果是这样,我不会有混合来自不同订单的采购订单的问题吗?Akka流和事务边界

换句话说,我试图实现的是处理不同工作流程的隔离,并想知道阿卡流是否为它提供了很好的匹配。

回答

2

直接回答您的问题:可以创建一个“将处理应用于每个项目,然后将其合并回单一值”的流。

开发您的例子有一些示例代码:

case class Item(itemId : String) 

case class PurchaseOrder(orderId : String, items : Seq[Item]) 

val purchaseOrder : PurschaseOrder = ??? 

如果我们想处理与流,我们可以在项目,虽然减少的确切性质是在问题不明确,所以我不会定义如何折叠方式获得:

type ProcessOutput = ??? 

def processItem(item : Item) : ProcessOutput = ??? 

val combinedResult : Future[CombinedResult] = 
    Source.fromIterator(purchaseOrder.items.toIterator) 
     .via(Flow[Item] map processItem) 
     .to(Sink.fold[ProcessOutput](???)(???)) 
     .run() 

间接回答你的问题,

考虑期货首先

当背压是必要的时,阿卡流非常有用。当连接到外部数据源时背压很常见,因为bp允许您的应用程序确定数据流式传输的速度,因为您负责连续发送更多数据的需求。

在您提出问题的情况下,不需要广播需求,and incur the inherent overhead,这种沟通需要。你已经有项目的集合,所以没有人送需求...

相反,我认为期货是去为您所描述的情况下,最好的办法:

def futProcess(item : Item)(implicit ec : ExecutionContext) = 
    Future { processItem(item) } 

// same output type as the stream run 
val combinedResults : Future[CombinedResult] = 
    Future.sequence{ purchaseOrder.items map futProcess } 
     .map{ _ fold[ProcessOutput](???)(???) } 

您将得到更好的性能,从一个完整的ActorSystem周围的复杂性和完全相同的结果无论如何...

+0

谢谢你的详细答案。我也明白,在某些情况下,您指出流可能不是最理想的。但是我认为在一般情况下,我会看到很多潜力巨大的潜力,并希望检查它们。 –

+0

@VagifAbilov欢迎您,愉快的黑客入侵。 –