2016-08-21 62 views
1

我使用下面的代码来测试阿卡流Flow.batch的行为从光源发出的多种元素,但我不能以某种方式弄清楚为什么结果不是我所期望:如何批量使用阿卡流Flow.batch

Source(1 to 20) 
    .map(x => { 
    println(s"received: ${x}") 
    x 
    }) 
    .batch(max=3, first => first.toString) {(batch, elem) => { 
    batch + "," + elem 
    }} 
    .runWith(Sink.foreach(x=>{ 
    Thread.sleep(4000) 
    println("Out:" + x) 
    })) 

这里是输出: received: 1 received: 2 received: 3 received: 4 Out:1,2,3 received: 5 Out:4 received: 6 Out:5 received: 7 Out:6 received: 8 Out:7 received: 9 Out:8 received: 10 Out:9 received: 11 Out:10 received: 12 Out:11 .... so on .... received: 19 Out:18 received: 20 Out:19 Out:20

有几点我不明白这里:

  • 首先,我的水槽是非常SL奥尔。我预计该物品将在下游排放之前一起进行配料,例如:Out:1,2,3; Out:4,5,6; Out:7,8;出:9,10,11等。相反,它只被分批一次(1,2,3),但是随后元素被逐个发射而不是被分批。
  • 为什么我收到了4项(已收:1,...,收到:4)在正确的开始,而事实上,我只设置最大值= 3(批次(最大值= 3))。
  • 因为源比宿更快。我期望该元素应该发射得更快,例如:收到:7,收到:8,收到:9;然后输出:7,8,9;但实际上,它只是在Sink的println函数执行后才逐个散发出来。

我曾试图改变地图mapAsync但行为仍然不是我要找:

.mapAsync(1)(x => { 
    println(s"received: ${x}") 
    Future.successful(x) 
    }) 

感谢。

回答

1

没有异步边界在你的代码的任何地方,它会在单个线程上运行。基本上,虽然你的Thread.sleep()在这个设置中没有发生其他进展,但是批处理不会发生(因为线程在Thread.sleep上被阻塞)。如果你有这样的设置,那么你可以使用分组()而不是批处理,或者可以使用groupedWithin()。如果你仍然想尝试批量(),然后尝试一个油门阶段,而不是添加睡眠。油门不会阻塞螺纹,所以上游进度(配料)不受影响。