2015-10-21 26 views
4

我是RxJava的新手,在(我猜)简单问题上挣扎。我想在3个线程中模拟地处理订阅部分。这就是为什么我使用FixedThreadPool。示例代码:如何在Android上使用RxJava在多个线程上运行子系列

Observer.just("one", "two", "three", "four") 
.observeOn(Schedulers.io()) 
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3)) 
.subscribe(new Observer<String>() { 

    public void onNext(String string) { 
     Log.d(TAG, "Started: " + string); 
     SystemClock.sleep(1000); 
     Log.d(TAG, "Ended: " + string); 
    } 

    (...) 

} 

预期结果:

Started: one 
Started: two 
Started: three 
Ended: one 
Started: four 
Ended: two 
Ended: three 
Ended: four 

实际结果:

Started: one 
Ended: one 
Started: two 
Ended: two 
Started: three 
Ended: three 
Started: four 
Ended: four 

我在做什么错?

回答

5

RxJava Observables是连续的,subscribeOnobserveOn运算符不会并行运行值。

可以达到最接近的事是通过模键分组值,通过observeOn运行它们,然后合并结果:

AtomicInteger count = new AtomicInteger(); 

Observable.range(1, 100) 
.groupBy(v -> count.getAndIncrement() % 3) 
.flatMap(g -> g 
    .observeOn(Schedulers.computation()) 
    .map(v -> Thread.currentThread() + ": " + v)) 
.toBlocking() 
.forEach(System.out::println); 
+0

感谢您的回答。但是,我是否正确理解该解决方案“为每个线程设置单独的队列”,因此如果任务不需要花费相同的时间量,那么最终某些线程可能会提前完成,而一个线程仍有多个任务需要运行。我的问题是RxJava支持在多个线程之间使用共享队列吗? – reinra

+0

这个设置和任何共享队列都没有偷工作。 – akarnokd

相关问题