2016-02-17 31 views
0

假设我有下面的简单图。任意停止运行akka流图

class KafkaSource[A](kI: KafkaIterator) extends GraphStage[SourceShape[A]] { 

    val out = Outlet[A]("KafkaSource.out") 

    override val shape = SourceShape.of(out) 

    override def createLogic(attr: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      push(out, kI.next) 
     } 
     }) 
    } 
} 

val g = GraphDSL.create(){ implicit b => 
    val source = b.add(new KafkaSource[Message](itr)) 
    val sink = b.add(Sink.foreach[Message](println)) 

    source ~> sink 
    ClosedShape 
} 

我们正在运行它作为

RunnableGraph.fromGraph(g).run() 

我想发信号kafkaSource停止(或人工完成),而不是推动下一个可用的元素,使连接段下游也停止。

我该如何做到这一点?

这种情况是,我们有数百万条消息在kafka &我们希望在每天晚上9点停止处理消息(例如),并假设我们正在使用干净关闭来停止正在运行的应用程序。

回答

0

虽然phantomastray任何更多的人可能不相关,但有帮助:

一个KILLSWITCH允许从 外FlowShape的曲线图的完成。它由一个流程元素组成,该元素可链接到需要完成控制的FlowShape的图形 。 KillSwitch特性允许 完成或失败图形。 [来源:Akka Docs]