2017-07-17 26 views
1

我放在一起这个虚拟实例,试图了解backpressure好一点:了解流动背压rxjava2

Flowable.range(1, 100).onBackpressureDrop() 
         .subscribeOn(Schedulers.io()) 
         .observeOn(AndroidSchedulers.mainThread()) 
         .subscribeWith(object : DisposableSubscriber<Int>() { 
         override fun onStart() { 
          request(1) 
         } 

         override fun onComplete() { 
          Log.d([email protected]::class.java.simpleName, "onComplete") 
         } 

         override fun onNext(t: Int?) { 
          Log.d([email protected]::class.java.simpleName, t.toString()) 
          Thread.sleep(1000) 
          request(1) 
         } 

         override fun onError(t: Throwable?) { //handle error} 
         }) 

我有一个非常缓慢的Subscriber从一个非常快的Flowable消耗数据。我正在指示Flowable onBackPressureDrop()。尽管这样,我的输出看起来像这样(从1到100)

07-16 23:07:21.097 22389-22389 D: 1 
07-16 23:07:22.100 22389-22389 D: 2 
07-16 23:07:23.102 22389-22389 D: 3 
07-16 23:07:24.104 22389-22389 D: ... 
07-16 23:07:24.104 22389-22389 D: ... 
07-16 23:07:24.105 22389-22389 D: 99 
07-16 23:07:25.105 22389-22389 D: 100 
07-16 23:07:25.107 22389-22389 D: onComplete 

我期待缺失的元素,因为用户是极其缓慢的,但事实并非如此,在所有打印从1到100号到控制台,每秒一次。

接下来,我试图一次请求所有值。所以我将request(1)替换为onStartrequest(Long.MAX_VALUE),并从onNext调用中删除request(1)。但它仍然打印数字1到100,没有丢失元素。

所以我想知道如何才能模拟一个用户缓慢的用户失踪事件? 我怎样才能使反压异常发生?

感谢

回答

3

observeOn有128默认的内部缓冲区的大小,这就是为什么你没有看到下降的元素,因为它可以简单地缓冲了所有你生成的100个元素。您可以通过observeOn(mainThread(), false, 1)将缓冲区大小设置为1,并且体验下降。

+0

这样做!谢谢。我遇到滴,但只有'.onBackPressureDrop()'操作符存在。为什么我删除它时不会得到异常?即使缓冲区已满,它也会正常打印所有值。 – feresr

+1

因为'Flowable.range'支持背压,并且可以与'observeOn'互操作,不管缓冲区有多大,也不会发信号通知MBE。顺便说一句,每个运算符在Flowable上都有一个** Backpressure ** javadoc条目来解释它的行为。 – akarnokd