2017-10-19 88 views
2

我有两个可观察:RxJs如何将两个重叠的可观察到的合并成一个

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-| 
-13--14--15--16--17--18--19-----20---------21--------------22------23--24--> 

第一个包含一些越来越多,但经过一段时间后停止(这些是来自数据库的光标结果) 第二类是不断涌现越来越多的人首先包含一些数字,但不要停止发射。 (这些都是新插入的数据到数据库)

我想这两个可观察一下一个连续的观察到这样的:

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-----22------23--24--> 

此观察到的包含了每一个号码只有一次,保持顺序发光。

如何使用尽可能少的内存来解决问题?

+0

想知道你是怎么得到这样两个重叠的观测量流?你的问题是什么意思? –

+1

我使用Rethinkdb。我在数据库中有旧数据,它由光标和新插入的数据(由换页传送)发送。在我从光标读取数据的同时,新插入的数据也由光标提供。这导致重叠 –

回答

2

我认为最好的办法在这里是缓冲b $,直到$流达到b $,然后发出所有b $的缓冲项并切换到b $。事情是这样的:

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15'; 
 
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24'; 
 

 
const fromMarble = str => Rx.Observable.from(str.split('-')).concatMap(x => Rx.Observable.of(x).delay(1)).filter(v => v.length).map(x => parseInt(x)); 
 

 
const a$ = fromMarble(a).share(); 
 
const b$ = fromMarble(b).share(); 
 

 
const switchingSignal$ = Rx.Observable.combineLatest(a$, b$.take(1), (a, b) => a >= b).filter(x => x).take(1).share(); 
 

 
const distinct$ = Rx.Observable.merge(
 
\t a$.takeUntil(switchingSignal$).map(x => x + '(from a)'), 
 
\t b$.buffer(switchingSignal$).take(1).mergeMap(buffered => Rx.Observable.from(buffered)).map(x => x + '(from b$ buffer)'), 
 
\t b$.skipUntil(switchingSignal$).map(x => x + '(from b$)') 
 
); 
 

 
distinct$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.0/Rx.js"></script>

2

您可以通过采取从(.concat)与第二流连接起来的第一流的所有元素,除了最新的一个(.last)之前(.skipWhile含)的元素

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15' 
 
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24' 
 
const fromMarble = str => Rx.Observable.defer(() => { 
 
    console.log('side effect from subscribing to: ' + str); 
 
    return Rx.Observable.from(str.split('-').filter(v => v.length)); 
 
}); 
 

 
const a$ = fromMarble(a); 
 
const b$ = fromMarble(b); 
 

 
const distinct$ = Rx.Observable.concat(
 
    a$, 
 
    a$.last().switchMap(latest => 
 
    // .skipWhile + .skip(1) => skipWhile but inclusive 
 
    b$.skipWhile(v => v !== latest).skip(1) 
 
), 
 
); 
 

 
distinct$.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

做到这一点另外,如果订阅时有副作用(例如订阅时 - 将创建新光标),则可以通过使用例如const a$ = fromMarble(a).shareReaplay()来为所有订户共享该副作用。

你可以阅读更多有关共享的副作用:老文档中

+0

尼斯解决方案。但是,如果b $流在a流之后发出,则不同的$流将被不必要地延迟。不知道这是否是一个问题,但可以毫不拖延地解决。 – ZahiC

+0

很好的@ZahiC和另外一个问题是当'b $'是空的 - 你什么也得不到。所以我修改了我的答案来处理这种情况。 –

+0

如果b $在$后发射,这肯定会解决延迟问题,但现在您可能订阅b $的时间太晚(第一个流完成时,已订阅串联流)。如果b $订阅需要时间(例如连接到数据库),切换到b $将会变慢。您可以通过早点连接到b $来解决这个问题(hot Observable),但这样可能会丢失一些物品。我将增加另一个解决方案以供参考 – ZahiC

相关问题