0
假设我有下面的简单图。任意停止运行akka流图
class KafkaSource[A](kI: KafkaIterator) extends GraphStage[SourceShape[A]] {
val out = Outlet[A]("KafkaSource.out")
override val shape = SourceShape.of(out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, kI.next)
}
})
}
}
val g = GraphDSL.create(){ implicit b =>
val source = b.add(new KafkaSource[Message](itr))
val sink = b.add(Sink.foreach[Message](println))
source ~> sink
ClosedShape
}
我们正在运行它作为
RunnableGraph.fromGraph(g).run()
我想发信号kafkaSource停止(或人工完成),而不是推动下一个可用的元素,使连接段下游也停止。
我该如何做到这一点?
这种情况是,我们有数百万条消息在kafka &我们希望在每天晚上9点停止处理消息(例如),并假设我们正在使用干净关闭来停止正在运行的应用程序。