2016-01-15 49 views
1

我遇到了使用RxJava背压的问题。基本上,我有一个生产者生产的产品比消费者可以处理的要多,并且希望有一些缓冲队列来处理我可以处理的物品,并请求完成其中的一些物品,如本例中:RxJava/RxScala使用请求的背压

object Tester extends App { 

Observable[Int] { subscriber => 
    (1 to 100).foreach { e => 
    subscriber.onNext(e) 
    Thread.sleep(100) 
    println("produced " + e + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")") 
    } 
} 
.subscribeOn(NewThreadScheduler()) 
.observeOn(ComputationScheduler()) 
.subscribe(
    new Subscriber[Int]() { 
    override def onStart(): Unit = { 
     request(2) 
    } 

    override def onNext(value: Int): Unit = { 
     Thread.sleep(1000) 
     println("consumed " + value + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")") 
     request(1) 
    } 

    override def onCompleted(): Unit = { 
     println("finished ") 
    } 
}) 

Thread.sleep(100000) 

我希望得到输出如下

produced 1(RxNewThreadScheduler-113) 
consumed 1(RxComputationThreadPool-312) 
produced 2(RxNewThreadScheduler-113) 
consumed 2(RxComputationThreadPool-312) 
produced 3(RxNewThreadScheduler-113) 
consumed 3(RxComputationThreadPool-312) 
...... 

,而是,我得到

produced 1(RxNewThreadScheduler-113) 
produced 2(RxNewThreadScheduler-113) 
produced 3(RxNewThreadScheduler-113) 
produced 4(RxNewThreadScheduler-113) 
produced 5(RxNewThreadScheduler-113) 
produced 6(RxNewThreadScheduler-113) 
produced 7(RxNewThreadScheduler-113) 
produced 8(RxNewThreadScheduler-113) 
produced 9(RxNewThreadScheduler-113) 
consumed 1(RxComputationThreadPool-312) 
produced 10(RxNewThreadScheduler-113) 
produced 11(RxNewThreadScheduler-113) 
produced 12(RxNewThreadScheduler-113) 
produced 13(RxNewThreadScheduler-113) 
..... 
+0

添加'onError'处理程序到你的用户,让您获取有关故障的信息。 –

回答

1

当你实现你的Observable使用Observable.create管理背压取决于你(这不是一项简单的任务)。在这里,你的observable简单地忽略了被动的pull请求(你只是迭代,而不是等待一个调用迭代器的方法next()的请求)。

如果可能的话,尽量使用Observable工厂方法等range,等...和构成使用map/flatMap以获得期望的源观察的,如那些将尊重背压。

否则,请查看最近推出的用于在OnSubscribe实施中正确管理背压的实验工具类:AsyncOnSubscribeSyncOnSubscribe

这是一个非常简单的例子:

Observable<Integer> backpressuredObservable = 
    Observable.create(SyncOnSubscribe.createStateful(
    () -> 0, //starts the state at 0 
    (state, obs) -> { 
     int i = state++; //first i is 1 as desired 
     obs.next(i); 
     if (i == 100) { //maximum is 100, stop there 
      obs.onCompleted(); 
     } 
     return i; //update the state 
})); 
+0

链接被破坏 – AndroidEx

+0

我的坏:(似乎最新的javadocs没有在网上发布,所以我链接到源代码... –

+0

也我把代码的lambda版本,但编译器有点麻烦接受它,所以你可能会更好的使用匿名类的等价物来代替......除非你使用Scala :) –