2017-06-29 64 views
0

我有一个Observable<Observable<String>> c。我想用新行将每个内部Observable合并为一个String。结果应该是Observable<String>如何组合最近的一个Observable <Observable <T>>?

有一个similar discussion for RxJS on GitHub

下面是使用zip一个实现:

final Observable<String> d = Observable.zip(c, objects -> Arrays.stream(objects) 
    .map(x -> (String)x) 
    .collect(Collectors.joining("\n"))); 

的问题是,zip对元素,因此如果内Observable对象具有不同的长度,那么不一切都被示出。我想用combineLatest代替,但似乎没有成为一个匹配功能:

public static <T, R> Observable<R> combineLatest(
    ObservableSource<? extends ObservableSource<? extends T>> sources, 
    Function<? super Object[], ? extends R> zipper) 

我应该如何实现呢?


举例来说,如果我有三个观测:

// a 
Observable.just("Started", "0%", "5%", "10%", "20%", "40%", "95%", "100%", "Finished") 

// b 
Observable.just("Started", "0%", "10%", "30%", "40%", "100%", "Finished") 

// c 
Observable.just("Started", "0%", "20%", "25%", "40%", "70%", "90%", "100%", "Finished") 

然后我zip解决方案的输出是:

--------------------- 
a: Started 
b: Started 
c: Started 
--------------------- 
a: 0% 
b: 0% 
c: 0% 
--------------------- 
a: 5% 
b: 10% 
c: 20% 
--------------------- 
a: 10% 
b: 30% 
c: 25% 
--------------------- 
a: 20% 
b: 40% 
c: 40% 
--------------------- 
a: 40% 
b: 100% 
c: 70% 
--------------------- 
a: 95% 
b: Finished 
c: 90% 

但所需的输出是一样的东西:

--------------------- 
a: Started 
b: Started 
c: Started 
--------------------- 
a: 0% 
b: 0% 
c: 0% 
--------------------- 
a: 5% 
b: 10% 
c: 20% 
--------------------- 
a: 10% 
b: 30% 
c: 25% 
--------------------- 
a: 20% 
b: 40% 
c: 40% 
--------------------- 
a: 40% 
b: 100% 
c: 70% 
--------------------- 
a: 95% 
b: Finished 
c: 90% 
--------------------- 
a: 100% 
b: Finished 
c: 100% 
--------------------- 
a: Finished 
b: Finished 
c: Finished 

回答

0

使用RxJava2(和Java 8)

c.toList().toObservable() 
      .flatMap(t -> 
        Observable.combineLatest(t, 
              objects -> Arrays.stream(objects) 
                  .map(x ->(String)x).collect(Collectors.joining("\n")))) 

使用RxJava

c.toList().flatMap(t -> 
        Observable.combineLatest(t, 
              objects -> Arrays.stream(objects) 
                  .map(x -> (String)x).collect(Collectors.joining("\n")))); 

我怀疑它会做你想要什么,但因为它是完全有可能将“跳过”排放量,但就是这样combineLatest作品。

+0

这似乎加入了错误的方式。在我的'zip'版本中,每个内部observables在每个结果元素中都有一行。在这个版本中,每个结果元素都有一个内部可观察对象的所有行。 – sdgfsdh

+0

您可以添加一个输入示例和您想要输出的内容吗? – JohnWowUs

+0

我更新了问题@JohnWowUs – sdgfsdh

相关问题