2017-07-18 57 views
2

我正在尝试创建一个Flowable,它发出关于背压的事件以避免内存问题,同时并行运行每个转换阶段以提高效率。我创建了一个简单的测试程序,用于推理我的程序的不同步骤的行为,以及何时发生事件而不是等待不同阶段。使用observeOn时,为什么我的RxJava Flowable不尊重背压?

我的程序如下:

public static void main(String[] args) throws ExecutionException, InterruptedException { 
    Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList()) 
     .stream().map(i -> { 
     System.out.println("emitting:" + i); 
     return i; 
     }); 

    Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator()); 
    System.out.println(String.format("Buffer size: %d", flowable.bufferSize())); 

    Long count = flowable.onBackpressureBuffer(10) 
     .buffer(10) 
     .flatMap(buf -> { 
     System.out.println("Sleeping 500 for batch"); 
     Thread.sleep(500); 
     System.out.println("Got batch of events"); 
     return Flowable.fromIterable(buf); 
     }, 1) 
     .map(x -> x + 1) 
     .doOnNext(i -> { 
     System.out.println(String.format("Sleeping : %d", i)); 
     Thread.sleep(100); 
     System.out.println(i); 
     }) 
     .count() 
     .blockingGet(); 

    System.out.println("count: " + count); 
} 

当我跑,我得到的输出如预期那样尊重背压,其中一批事件被emmited到大小buffer,然后将它们flatmapped,最后是采取了一些行动,他们正在印刷,其中一个接一个:

Buffer size: 128 
emitting:0 
emitting:1 
emitting:2 
emitting:3 
emitting:4 
emitting:5 
emitting:6 
emitting:7 
emitting:8 
emitting:9 
Sleeping 500 for batch 
Got batch of events 
Sleeping : 1 
1 
Sleeping : 2 
2 
Sleeping : 3 
3 
Sleeping : 4 
4 
Sleeping : 5 
5 
Sleeping : 6 
6 
Sleeping : 7 
7 
Sleeping : 8 
8 
Sleeping : 9 
9 
Sleeping : 10 
10 
emitting:10 
emitting:11 
emitting:12 
emitting:13 
emitting:14 
emitting:15 
emitting:16 
emitting:17 
emitting:18 
emitting:19 
Sleeping 500 for batch 
Got batch of events 
Sleeping : 11 
11 
Sleeping : 12 
12 
Sleeping : 13 

但是,如果我试图通过增加.observeOn(Schedulers.computation())一些调用此并行运行的不同阶段,那么就好像我的程序不再尊重backpre ssure。我的代码现在看起来像:

public static void main(String[] args) throws ExecutionException, InterruptedException { 
    Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList()) 
     .stream().map(i -> { 
     System.out.println("emitting:" + i); 
     return i; 
     }); 

    Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator()); 
    System.out.println(String.format("Buffer size: %d", flowable.bufferSize())); 

    Long count = flowable.onBackpressureBuffer(10) 
     .buffer(10) 
     .observeOn(Schedulers.computation()) 
     .flatMap(buf -> { 
     System.out.println("Sleeping 500 for batch"); 
     Thread.sleep(500); 
     System.out.println("Got batch of events"); 
     return Flowable.fromIterable(buf); 
     }, 1) 
     .map(x -> x + 1) 
     .observeOn(Schedulers.computation()) 
     .doOnNext(i -> { 
     System.out.println(String.format("Sleeping : %d", i)); 
     Thread.sleep(100); 
     System.out.println(i); 
     }) 
     .observeOn(Schedulers.computation()) 
     .count() 
     .blockingGet(); 

    System.out.println("count: " + count); 
} 

而且我的输出是下面的,所有我的事件被发出,而不是尊重由执行各阶段规定的背压和缓冲区前期:

Buffer size: 128 
emitting:0 
emitting:1 
emitting:2 
emitting:3 
emitting:4 
emitting:5 
emitting:6 
emitting:7 
emitting:8 
emitting:9 
emitting:10 
Sleeping 500 for batch 
emitting:11 
emitting:12 
... everything else is emitted here ... 
emitting:998 
emitting:999 
Got batch of events 
Sleeping 500 for batch 
Sleeping : 1 
1 
Sleeping : 2 
2 
Sleeping : 3 
3 
Sleeping : 4 
4 
Sleeping : 5 
Got batch of events 
Sleeping 500 for batch 
5 
Sleeping : 6 
6 
Sleeping : 7 
7 
Sleeping : 8 
8 
Sleeping : 9 
9 
Sleeping : 10 
Got batch of events 
Sleeping 500 for batch 
10 
Sleeping : 11 
11 
Sleeping : 12 
12 
Sleeping : 13 
13 
Sleeping : 14 
14 
Sleeping : 15 
Got batch of events 
Sleeping 500 for batch 
15 
Sleeping : 16 
16 
Sleeping : 17 
17 
Sleeping : 18 
18 
Sleeping : 19 
19 
Sleeping : 20 
Got batch of events 
Sleeping 500 for batch 
20 
Sleeping : 21 
21 
Sleeping : 22 
22 
Sleeping : 23 
23 
Sleeping : 24 
24 
Sleeping : 25 
Got batch of events 
Sleeping 500 for batch 
25 

假装我的批处理阶段正在呼叫外部服务,但我希望他们能够平行运行,因为延迟。我也希望在给定的时间内控制内存中的物品数量,因为最初发出的物品数量可能变化很大,并且批次运行的阶段比最初的事件发射速度慢得多。

如何在Scheduler范围内对Flowable尊重背压?为什么在我拨打observeOn的电话时,似乎不尊重背压?

回答

3

怎样才可以有我的可流动相的背压跨越调度

实际上,施加onBackpressureBuffer使得源上方它从由下游应用的任何背压断开,因为它是无界的操作员。你不需要它,因为Flowable.fromIterable(顺便说一句,RxJava有一个range运营商)支持并承诺背压。

为什么当我撒上电话观察时,只是不尊重背压?

在第一示例中,存在发生所谓调用堆栈阻断天然背压。 RxJava默认是同步的,大多数运算符不会引入异步,就像第一个例子中没有。

observeOn因此在理论上引入了异步边界,阶段可以相互平行运行。它有一个默认的128个元素预取缓冲区,可以通过它的一个过载来调整。然而,在你的情况下,缓冲区(10)实际上会将预取数量放大到1280,这可能仍然会导致1000个元素长的源一次完全消耗。

+0

为什么'buffer(10)'将预取量放大到'bufferSize * flowablePrefetchBufferSize'?确实,如果我改变这个发出10,000个整数,而不是第1280个整数后,我可以看到它由于背压而停止发射,但是,我应该如何收集批量的东西发送到外部服务而不会爆炸我记忆中的物品? –

+0

由于'buffer'理解当它获得N个列表的请求时,它必须从其上游请求10 * N个元素来满足该需求。正如我所提到的,你可以通过参数化'observeOn'来减少请求。 – akarnokd

+0

我明白了,感谢所有的帮助! –

相关问题