我有Observable调用消息,其中包含几条消息。我想同时处理这些消息。我可以如何使用rxJava?如何在并行线程中执行observable
消息。(代码以执行观测的物品平行).subscribe(MSG->处理(MSG))
(如果可观察到的包含五个不同的消息然后我需要在五个单独的线程来处理这五个消息)
我有Observable调用消息,其中包含几条消息。我想同时处理这些消息。我可以如何使用rxJava?如何在并行线程中执行observable
消息。(代码以执行观测的物品平行).subscribe(MSG->处理(MSG))
(如果可观察到的包含五个不同的消息然后我需要在五个单独的线程来处理这五个消息)
如果你想留在Observable
世界里,你可以flatMap
与subscribeOn
和计算每个元素要在并行:
Observable.range(1, 10)
.flatMap(v ->
Observable.fromCallable(() -> compute(v))
.subscribeOn(Schedulers.computation)
)
.subscribe(e -> { }, Throwable::printStackTrace);
,如果你想让结果保持给定的顺序,那么你可以使用'concatMapEager'而不是'flatMap'并保持并发性。 – Andy
运行单个线程“观察”您的消息,并将其中包含的每条消息分配给提交给某种workers thread pool的新消息处理任务(例如,简单的Runnable
)。
在这里,你会发现简单的操作方法:https://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html
这听起来像多线程和NIO反应模型的混合。你确定这将是一件好事吗? – duffymo
订阅5 Observable的观察者? – gaston