TL; DR你可以在一段时间后关闭子。但是,使用输入来动态设置内置阶段的时间是另一回事。
关闭一个子
要关闭的流动时,通常完成它(从上游),但也可以取消它(从下游)。例如,一旦n
元素已经通过,take(n: Int)
流程将取消。
现在,在groupBy
的情况下,您无法完成一个子流,因为上游流被所有子流共享,但是您可以取消它。如何取决于你想要结束的条件。
但是,要知道,groupBy
消除输入,用于已经被关闭的子流程:如果ID为3
一个新的元素来自上游到groupBy
的3
-substream关闭后,它只会被忽略,在未来元素将被拉入。其原因可能是在关闭和重新打开子流之间的过程中可能会丢失一些元素。此外,如果您的流应该运行很长时间,这会影响性能,因为每个元素都将在转发到相关(实时)子流之前针对已关闭的子流列表进行检查。如果你不满意这个表现,你可能想要实现你自己的有状态过滤器(比如说,使用bloom过滤器)。
要关闭一个子流,我通常使用take
(如果您只需要给定数量的元素,但在无限流上可能不是这种情况),或者某种超时:completionTimeout
如果您想要固定从实现到关闭的时间或idleTimeout
如果您想在没有元素通过一段时间时关闭。请注意,这些流不取消流,但是失败了,所以你必须赶上与recover
或recoverWith
阶段异常改变失败成取消(recoverWith
允许您取消不发送任何最后一个元素,通过与Source.empty
恢复) 。
动态设置超时
现在,你想要的是根据第一通过元素动态设置关闭时间。这更复杂,因为流的实现与通过它们的元素无关。事实上,在通常情况下(没有groupBy
)的情况下,流在任何元素通过它们之前都已经实现,因此使用元素来实现它们是没有意义的。
我有类似的问题that question,并最终使用的groupBy
修改后的版本与签名
paramGroupBy[K, OO, MM](maxSubstreams: Int, f: Out => K, paramSubflow: K => Flow[Out, OO, MM])
,允许使用定义它的关键在于确定每个子。这可以修改为具有第一个元素(而不是关键字)作为参数。
另一种(可能更简单,就你的情况而言)的方式是编写自己的舞台,完全按照你的要求:从第一个元素获取结束时间并取消当前的流。这里是一个示例实现(我使用调度器而不是设置状态):
object CancelAfterTimer
class CancelAfter[T](getTimeout: T => FiniteDuration) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("CancelAfter.in")
val out = Outlet[T]("CancelAfter.in")
override val shape: FlowShape[T, T] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (!isTimerActive(CancelAfterTimer))
scheduleOnce(CancelAfterTimer, getTimeout(elem))
push(out, elem)
}
override def onTimer(timerKey: Any): Unit =
completeStage() //this will cancel the upstream and close the downstrean
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}