2016-09-23 46 views
0

我试过下面FlowableOnBackpressureDrop忽略认购的请求方法

Flowable.interval(100L, TimeUnit.MILLISECONDS) 
    .onBackpressureDrop() 
    .observeOn(Schedulers.computation()) 
    .subscribe(new Subscriber<Long>() { 

     private Subscription subscription; 

     @Override 
     public void onSubscribe(Subscription subscription) { 
     this.subscription = subscription; 
     this.subscription.request(1L); 
     } 

     @Override 
     public void onNext(Long t) { 
     try { 
      Thread.sleep(300L); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     System.out.println(t); 

     subscription.request(1L); 
     } 

     ... 
    }); 

我的预期,我可能会得到这样的0,3,6 ... 不过,我得到0,1,2,3 ... 因为Flowable最初得到了request(Long.MAX_VALUE)

我检查FlowableOnBackpressureDrop,我发现

@Override 
    public void onSubscribe(Subscription s) { 
     if (SubscriptionHelper.validate(this.s, s)) { 
      this.s = s; 
      actual.onSubscribe(this); 
      s.request(Long.MAX_VALUE); 
     } 
    } 

由于没有使用我于请求方法中设置的值, 我认为背压不工作。

这是错误还是正确的操作?

我试过RxJava 2.0.0 RC2和RC3

回答

1

observeOn请求,因此没有下降一段时间在一开始缓存128元的前期。如果你让它运行128 * 300ms,它将开始跳过数值。您可以将预取量设置为1,并且看到值相当快地下降。

+0

谢谢您的信息!现在,我知道原因是'observeOn'和如何解决。我加了1来观察On的参数,我得到了我想要的。尽管我对背压下降选项缓冲项目感到不舒服,因为我不仅要关心请求方法,还要关心observeOn方法,RxJava 1.1的作用与此相同。所以这是从一开始的方式。我想知道有没有人对此有任何疑问。 – arching