2
我面对很奇怪的RxJava行为,我不明白。RxJava调度程序不会改变线程与睡眠
假设我想要并行处理元素。我使用flatMap为:
public static void log(String msg) {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("%s - %s", threadName, msg));
}
public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.create(s -> {
while (true) {
log("start");
s.onNext(Math.random());
sleep(10);
}
}).subscribeOn(sA)
.flatMap(r -> Observable.just(r).subscribeOn(sB))
.doOnNext(r -> log("process"))
.subscribe((r) -> log("finish"));
}
输出是相当可预测的:
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-2 - process
pool-2-thread-2 - finish
pool-1-thread-1 - start
pool-2-thread-3 - process
pool-2-thread-3 - finish
好吧好吧,但是如果我有n> 10补充睡眠的地图flatMap并行调度停止变化后的线程。
public static void main(String[] args) throws InterruptedException {
Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.create(s -> {
while (true) {
log("start");
s.onNext(Math.random());
sleep(10);
}
}).subscribeOn(sA)
.flatMap(r -> Observable.just(r).subscribeOn(sB))
.doOnNext(r -> sleep(15))
.doOnNext(r -> log("process"))
.subscribe((r) -> log("finish"));
}
是什么给了以下内容:
pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-1 - process
为什么???为什么所有元素都在flatMap之后在同一个线程(pool-2-thread-1)中进行处理?
谢谢。而且为什么我没有在没有睡眠的情况下偷看这个线程呢? – corvax
源之间存在非确定性的排放竞争,某些线程也可能发出其他线程的元素。 – akarnokd