2016-09-15 56 views
3

我的方案很简单,但我似乎无法在任何地方找到它。等待多个异步调用完成RxJava

我有一组元素,我想遍历每个人调用一个异步函数,然后等待他们完成(这又一次发生在一个异步的方式,在函数的逻辑中实现)。我对RxJava比较新,并且在NodeJS中通过将回调传递给该函数并在最后等待来实现。 这里是什么,我需要的伪代码(用于元素的迭代器并不需要是同步的,也没有订购):

for(line in lines){ 
callAsyncFunction(line); 
} 
WAIT FOR ALL OF THEM TO FINISH 

你的帮助是非常感谢!

+0

请发布实际的代码。 RxJava拥有许多具有不同行为的运营商。不知道你使用哪个操作员,这是不可能的。 –

+0

使用countdownlatch类,并在每个可观察调用countDown方法的oncomplete方法中。 – gaston

回答

1

使用的Rx:

Observable 
.from(lines) 
.flatMap(line -> callAsyncFunctionThatReturnsObservable(line).subscribeOn(Schedulers.io()) 
.ignoreElements(); 

在这一点上,这取决于你想做的事,你可以使用一个.switchIfEmpty(...)订阅另一个观察到的东西。

0

使用defer,然后在观察对象上使用zip,将“要计算的项目”的Iterable转换为Iterable of Observables。

更困难的部分将是“等待他们完成”。正如你可能已经猜到的那样,反应扩展是关于对事物“做出反应”的,而不是“等待事情发生”。您可以将subscribe添加到observable,它将发出单个项目,然后在每个observable只有一个项目时完成。该用户可以在等待后执行您通常会执行的任何操作;这允许你返回并让你的代码在不阻塞的情况下做它的事情。

0

从技术上讲,如果你想到它,你需要做的是从你的所有元素创建一个Observable,然后zip them together继续你的流的执行。

这将在伪代码给你的东西是这样的:

List<Observable<?>> observables = new ArrayList<>(); 
for(line in lines){ 
    observables.add(Observable.fromCallable(callAsyncFunction(line)); 
} 
Observable.zip(observables, new Function<...>() { ... }); // kinda like Promise.all() 

但它可能是毫不奇怪,Observable.from()可以在一个迭代暴露的每一个元素为对象的流,从而消除你为一个循环需要。因此,您可以使用Observable.fromCallable()创建一个新的Observable,它在异步操作完成时调用onCompleted()。之后,您可以通过将这些新的Observables收集到列表中来等待。

Observable.from(lines) 
    .flatMap(new Func1<String, Observable<?>>() { 
     @Override 
     public Observable<?> call(String line) { 
      return Observable.fromCallable(callAsyncFunction(line)); // returns Callable 
     } 
    }).toList() 
     .map(new Func1<List<Object>, Object>() { 
     @Override 
     public Object call(List<Object> ignored) { 
      // do something; 
     } 
    }); 

我基于这个答案的下半部分重大this answer