从技术上讲,如果你想到它,你需要做的是从你的所有元素创建一个Observable,然后zip them together继续你的流的执行。
这将在伪代码给你的东西是这样的:
List<Observable<?>> observables = new ArrayList<>();
for(line in lines){
observables.add(Observable.fromCallable(callAsyncFunction(line));
}
Observable.zip(observables, new Function<...>() { ... }); // kinda like Promise.all()
但它可能是毫不奇怪,Observable.from()
可以在一个迭代暴露的每一个元素为对象的流,从而消除你为一个循环需要。因此,您可以使用Observable.fromCallable()
创建一个新的Observable,它在异步操作完成时调用onCompleted()
。之后,您可以通过将这些新的Observables收集到列表中来等待。
Observable.from(lines)
.flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String line) {
return Observable.fromCallable(callAsyncFunction(line)); // returns Callable
}
}).toList()
.map(new Func1<List<Object>, Object>() {
@Override
public Object call(List<Object> ignored) {
// do something;
}
});
我基于这个答案的下半部分重大this answer。
请发布实际的代码。 RxJava拥有许多具有不同行为的运营商。不知道你使用哪个操作员,这是不可能的。 –
使用countdownlatch类,并在每个可观察调用countDown方法的oncomplete方法中。 – gaston