2015-09-10 70 views
3

当使用Akka Streams时,有什么方法可以关闭/关闭不再需要资源清理的流吗?关闭资源清理的Akka流

编辑:当源由无限流组成时,它可能永远不会完成,我想在完成源之前停止它。

用法示例:

Source.from(publisher) 
     .map((p) -> p) 
     .to(Sink.ignore()) 
     .run(materializer) 

有没有一种方法来关闭流?

+0

当上游源不再有任何数据满足下游需求时,流应自然停止。所以当发布者向下游发送完成时,这个物化流实例应该停止。或者,如果你有一个长时间运行的流程,你可以考虑使用一个单独的物化器,然后在物化器上调用'shutdown()' – cmbaxter

+0

由于Source是一个发布者,它可以是一个无限流(在我的情况下来自卡夫卡的一条小河),它永远不会完成。 – aseychell

+0

通过'发布者',你能从下游的'Flow'访问'Subscription'吗?这样你就可以取消它了吗?你的'发布者'的impl类是什么? – cmbaxter

回答

4

您可以运行在一个独立的ActorMaterializerStream和一段时间后调用shutdown的ActorMaterializer:

val actorSystem = ActorSystem() 

val temporaryStream = { 

    val localMat = ActorMaterializer()(actorSystem) 

    import actorSystem.dispatcher 
    actorSystem.scheduler.scheduleOnce(10 minutes) { localMat.shutdown() } 

    Source.from(publisher) 
     .map((p) -> p) 
     .to(Sink.ignore()) 
     .run()(localMat) 
} 

同样,你可以返回,而不是ActorMaterializer基于物化流和关闭ActorMaterializer一些外部条件,而不是时间。

+0

我发现为每个流创建一个新的actor系统效率相当低。在我的场景中,我使用akka流来订阅Apache Kafka主题,如果我使用您的建议,这需要多个流并需要大量资源(每个主题都有一个线程池)。 – aseychell

+0

根据您的新需求查看我的更新建议... –

+0

我正在使用'Materializer'接口而不是'ActorMaterializer',并且未在接口上找到关闭方法。谢谢! – aseychell