2015-08-28 31 views
1

给定一个队列像这样:Scalaz流分块多达N

val queue: Queue[Int] = async.boundedQueue[Int](1000) 

欲拉断该队列和它传输到下游水槽,在UP的组块100。

queue.dequeue.chunk(100).to(downstreamConsumer) 

作品之类的,但如果我有说101个的消息就不会清空队列。剩下1条消息,除非另有99个消息被推入。我希望尽可能多地从队列中抽取100个消息,这与我的下游过程可以处理的速度一样快。

有一个现有的组合子可用?

回答

0

对于这一点,你可能需要从它出队时,监视队列的大小。那么,如果大小达到0,你就不会再等待更多的元素。实际上,您可以根据队列的大小实施elastic批量调整。即:

val q = async.unboundedQueue[String] 

val deq:Process[Task,(String,Int)] = q.dequeue zip q.size 
val elasticChunk: Process1[(String,Int), Vector[String]] = ??? 
val downstreamConsumer : Sink[Task,Vector[String]] = ??? 

deq.pipe(elasticChunk) to downstreamConsumer 
+0

你将如何实现elasticChunk? –

+0

我实际上使用方便的q.dequeueBatch方法解决了这个问题。不知道它存在。 –