当使用Akka Streams时,有什么方法可以关闭/关闭不再需要资源清理的流吗?关闭资源清理的Akka流
编辑:当源由无限流组成时,它可能永远不会完成,我想在完成源之前停止它。
用法示例:
Source.from(publisher)
.map((p) -> p)
.to(Sink.ignore())
.run(materializer)
有没有一种方法来关闭流?
当使用Akka Streams时,有什么方法可以关闭/关闭不再需要资源清理的流吗?关闭资源清理的Akka流
编辑:当源由无限流组成时,它可能永远不会完成,我想在完成源之前停止它。
用法示例:
Source.from(publisher)
.map((p) -> p)
.to(Sink.ignore())
.run(materializer)
有没有一种方法来关闭流?
您可以运行在一个独立的ActorMaterializer
的Stream
和一段时间后调用shutdown的ActorMaterializer:
val actorSystem = ActorSystem()
val temporaryStream = {
val localMat = ActorMaterializer()(actorSystem)
import actorSystem.dispatcher
actorSystem.scheduler.scheduleOnce(10 minutes) { localMat.shutdown() }
Source.from(publisher)
.map((p) -> p)
.to(Sink.ignore())
.run()(localMat)
}
同样,你可以返回,而不是ActorMaterializer基于物化流和关闭ActorMaterializer一些外部条件,而不是时间。
当上游源不再有任何数据满足下游需求时,流应自然停止。所以当发布者向下游发送完成时,这个物化流实例应该停止。或者,如果你有一个长时间运行的流程,你可以考虑使用一个单独的物化器,然后在物化器上调用'shutdown()' – cmbaxter
由于Source是一个发布者,它可以是一个无限流(在我的情况下来自卡夫卡的一条小河),它永远不会完成。 – aseychell
通过'发布者',你能从下游的'Flow'访问'Subscription'吗?这样你就可以取消它了吗?你的'发布者'的impl类是什么? – cmbaxter