2017-05-05 18 views
3

我有一个Rxjs可观测(stream在下面的代码中),它会发射可观测量(subjOnesubjTwo)。每个内部观察者可以随时以任何顺序发出自己的值。我的任务是从subjOne捕获值,直到subjTwo发出其第一个值。Javascript observables:需要switchMap的功能,但有轻微的差异

const subjOne = new Subject(); 
const subjTwo = new Subject(); 

const stream = Observable.create(observer => { 
    observer.next(subjOne); 
    observer.next(subjTwo); 
}); 

stream 
    .someOperator(subj => subj) 
    .subscribe(value => console.log('Value: ', value)); 

实施例1: subjOne发射值1和2,然后subjTwo发射值3,则subjOne发射4. 输出应为:1,2,3

实施例2: subjTwo发出1,然后subjOne发出2. 输出应该是1.

switchMap不适合这里,因为它从subjOne尽快删除值从stream发射。任何关于如何实现这一点的想法?谢谢。

UPDATE:在我的实际情况,不但有两个内部观测 - subjOnesubjTwo - 但他们络绎不绝,人工手动硬编码subjOne.takeUntil(subjTwo)是不是一个可行的选择。

回答

3

我想这你想要做什么:

// scan to let us keep combining the previous observable 
// with the next observable 
source 
    .scan((current, next) => { 
    // takeUntil to stop current when next produces 
    const currentUntil = current.takeUntil(next); 
    // now merge current with next 
    return currentUntil.merge(next) 
    }, Rx.Observable.empty()) 
    // switch to the most recent version of the combined inner observables 
    .switch(); 

注意这将如果内部可观察到的是只能正常工作。如果他们是冷的可观察的,它将需要更多的代码来实现。

0

听起来像是您在寻找takeUntil

takeUntil在一个流上监听,直到第二个流发出。因此,对于你的例子

// Emits every second 
const source = Observable.timer(0, 1000); 

// Emits once after 3 seconds 
const canceller = Observable.timer(3000); 

source 
    // Takes from the source until this stream emits 
    .takeUntil(canceller) 
    .subscribe({ 
    next: x => console.log(`Value ${x}`), 
    complete:() => console.log('Done') 
    }); 
// Value 0 -> 0 seconds 
// Value 1 -> 1 seconds 
// Value 2 -> 2 seconds 
// Done -> 3 seconds 
+0

我的不好,我没有提到,在我的情况下,实际上有更多的内部可观察量不止两个。 –

+0

那么你如何确定哪个流依赖于哪个流? – paulpdaniels

+0

它应该从任何内部可观测值中吐出值,直到一些更新的内部可观测值(称为X)发出其第一个值。此时,它应该忽略来自旧观测值的所有未来值,并仅使用可观测X值。与原始问题中所述相同,但有1000个观测值,而不是两个。所以如果我正确地理解你的问题,那就是重要的那些内部观察者出现的顺序。 –

0

你可以让你的二级学科,并且随后用takeUntil将它们结合起来:

const subjOne = new Subject(); 
const subjTwo = new Subject(); 

const stream = Observable.create(observer => { 
    observer.next(subjOne); 
    observer.next(subjTwo); 
}); 

const first$ = stream.take(1).flatMap(x=>x) 
const second$ = stream.skip(1).take(1).flatMap(x=>x) 

first$.takeUntil(second$).subscribe(...) 
+0

是的,但如果有100个可观察物而不是两个呢? –