2016-05-13 53 views
2

我在Akka Stream中了解到,一个插座必须连接到一个插座,并且没有自动支持将多个接收器连接到相同的信号源。所以你必须插入中间对象,如BroadcastAkka Stream:在图形构建器阶段动态增长插座

我正在将一个信号处理DAG转换为一个Akka Stream图,如果我可以在通过遍历发现它们时动态地添加接收器,它将会对我有很大的帮助。如果我有自定义GraphStage,我可以在Graph.create阶段有我自己的Shapeoutlets系列动态增长吗?正常的DSL操作~>由这个调用支持:

b.addEdge(importAndGetPort(b), to) 

如何生成器“获得” Outlet在这里,我将能够在其上生长的需求我的形状?


如果这不起作用,是有可能“退出”之前播出,断开其边缘及连接线路与图形建设过程中一个新的更大的广播?

+0

我不知道潜在问题的答案,但是这种设计有一个很大的缺点:信息源只会像最慢的目的地一样快地传播值。当所有接收器处理完一条消息时,广播只会表示更多的需求。你的问题似乎更适合直接使用Actor。然后,您可以使用路由动态添加到Actor的目的地。 http://doc.akka.io/docs/akka/2.4.2/scala/routing.html –

+0

@RamonJRomeroyVigil是的,源只能处理一次所有的接收器准备就绪。这是故意的,因为源节点可能执行昂贵且不可重复的计算。如果需要,我可以在源和汇之间插入缓冲区。 –

回答

1

GraphDSL不允许动态改变你的形状。

但是,由于Akka 2.4.10可以使用BroadcastHub(和MergeHub)。

BroadcastHub可以为您提供一个接收器,可以实现为一个源。 这个来源可以按照需要多次实现,以动态附加多个订阅者。

因此,对于你DAG的节点(例如,具有入度= 1和出度= 3),你可以有像

val hubSource = inEdgeSource.toMat(BroadcastHub.sink(bufferSize = ...))(Keep.right).run() 

val nodeSink1 = hubSource.to(outEdgeSink1).run() 
val nodeSink2 = hubSource.to(outEdgeSink2).run() 
val nodeSink3 = hubSource.to(outEdgeSink3).run() 

阿卡文档:

http://doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Dynamic_fan-in_and_fan-out_with_MergeHub_and_BroadcastHub