2016-09-28 25 views
3

我有Observable调用消息,其中包含几条消息。我想同时处理这些消息。我可以如何使用rxJava?如何在并行线程中执行observable

消息。(代码以执行观测的物品平行).subscribe(MSG->处理(MSG))

(如果可观察到的包含五个不同的消息然后我需要在五个单独的线程来处理这五个消息)

+0

这听起来像多线程和NIO反应模型的混合。你确定这将是一件好事吗? – duffymo

+0

订阅5 Observable的观察者? – gaston

回答

3

如果你想留在Observable世界里,你可以flatMapsubscribeOn和计算每个元素要在并行:

Observable.range(1, 10) 
.flatMap(v -> 
    Observable.fromCallable(() -> compute(v)) 
    .subscribeOn(Schedulers.computation) 
) 
.subscribe(e -> { }, Throwable::printStackTrace); 
+0

,如果你想让结果保持给定的顺序,那么你可以使用'concatMapEager'而不是'flatMap'并保持并发性。 – Andy