我有这种情况,当一个sink
(或中间flow
)实际上可以产生一些副作用数据,必须推回(或附加)到Source
。有没有一种方法可以使用流式DSL来实现这一点?我可以使用一些阻塞队列或排序来创建source
,然后将数据直接推送到该队列,但是这是打破流的抽象。也许有一个我不知道的更好的解决方案?Akka流 - 连接水槽到源?
3
A
回答
2
正如Viktor所说,你可以使用圆形图。
例如,partition
阶段允许您选择流的特定元素。
def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.foreach[Int](println)
val addOne = Flow[Int].map(_ + 1)
val partition = builder.add(Partition[Int](2, partitionFunction))
val merge = builder.add(Merge[Int](2))
in ~> merge ~> partition
partition.out(0) ~> addOne ~> merge
partition.out(1) ~> out
ClosedShape
})
在这个例子中,源极in
连接到merge
的一个输入。整数然后通过partition
阶段,这将分离偶数和奇数。
偶数正在经历addOne
流程,然后进入merge
的第二个输入(这将使它们再次返回到partition
阶段)。
奇怪的是直接去水槽out
。
这允许将一些值反馈回图中,但它很容易导致循环(这就是为什么addOne
阶段在这里很重要,没有它的偶数会被困在图中)。
1
Reactive-kafka做了类似的事情(至少在0.8版本中):它将Sink所消耗的消息提交给源代码(Kafka consumer)。
KafkaCommitterSink是执行。尽管这不是一个真正的圆形图,但据我所知,它更独立于流的“更新”源。
相关问题
- 1. 将Akka HTTP连接到Akka流
- 2. 源数据到水槽
- 3. 使用Akka Streams的许多水槽的一个来源
- 4. 如何设计卡夫卡连接即水槽以及来源
- 5. 水槽错误:连接被拒绝
- 6. 火花流整合水槽
- 7. 编写自定义水槽NG源/水槽的最佳方式
- 8. 水槽 - 是否有可能只停止水槽来源?
- 9. 水槽水槽星火使用Scala的
- 10. Akka流源 - cassandra结果集
- 11. 实施水槽水槽
- 12. 使用水槽将水槽记录到水槽所需的格式
- 13. Qt转发插槽/连接插槽到插槽?
- 14. akka流中的流动问题
- 15. Akka http连接池
- 16. Akka远程连接
- 17. 如何 “contramap” 阿卡流水槽
- 18. 阿帕奇水槽不与Twitter流
- 19. 连接QAction插槽
- 20. 在AKKA-Streams中使用动态水槽目的地?
- 21. 收集水槽物化值作为源
- 22. 如何将paintEvent连接到插槽?
- 23. 如何将信号连接到插槽
- 24. 几个QSliders连接到一个插槽
- 25. 无法连接到家长插槽
- 26. Qt将信号连接到插槽
- 27. 连接到驱动器槽ssh跳
- 28. QAction不会连接到我的插槽
- 29. 如何rowCountChanged连接到插槽
- 30. 将QAction连接到虚拟插槽?
如果“水槽”产生输出,那么它不是水槽,而是一个有效的流量。 –
@ViktorKlang好了,我可以有条件地将'Flow'连接到它的'Source',所以在某些情况下,由这个特定的'Flow'发出的事件将会通过Graph的根,就像它由图的'Source'? – jdevelop
是的,它使用GraphDSL并启用圆形图。请记住,循环背压图需要一些深思熟虑才能正确使用。 –