2016-09-04 39 views
3

我有这种情况,当一个sink(或中间flow)实际上可以产生一些副作用数据,必须推回(或附加)到Source。有没有一种方法可以使用流式DSL来实现这一点?我可以使用一些阻塞队列或排序来创建source,然后将数据直接推送到该队列,但是这是打破流的抽象。也许有一个我不知道的更好的解决方案?Akka流 - 连接水槽到源?

+1

如果“水槽”产生输出,那么它不是水槽,而是一个有效的流量。 –

+0

@ViktorKlang好了,我可以有条件地将'Flow'连接到它的'Source',所以在某些情况下,由这个特定的'Flow'发出的事件将会通过Graph的根,就像它由图的'Source'? – jdevelop

+1

是的,它使用GraphDSL并启用圆形图。请记住,循环背压图需要一些深思熟虑才能正确使用。 –

回答

2

正如Viktor所说,你可以使用圆形图。

例如,partition阶段允许您选择流的特定元素。

def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1 

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 
    val in = Source(1 to 10) 
    val out = Sink.foreach[Int](println) 

    val addOne = Flow[Int].map(_ + 1) 

    val partition = builder.add(Partition[Int](2, partitionFunction)) 
    val merge = builder.add(Merge[Int](2)) 

          in ~> merge ~> partition 
    partition.out(0) ~> addOne ~> merge 
    partition.out(1) ~> out 

    ClosedShape 
    }) 

在这个例子中,源极in连接到merge的一个输入。整数然后通过partition阶段,这将分离偶数和奇数。

偶数正在经历addOne流程,然后进入merge的第二个输入(这将使它们再次返回到partition阶段)。

奇怪的是直接去水槽out

这允许将一些值反馈回图中,但它很容易导致循环(这就是为什么addOne阶段在这里很重要,没有它的偶数会被困在图中)。

1

Reactive-kafka做了类似的事情(至少在0.8版本中):它将Sink所消耗的消息提交给源代码(Kafka consumer)。

KafkaCommitterSink是执行。尽管这不是一个真正的圆形图,但据我所知,它更独立于流的“更新”源。