我有一个Rxjs可观测(stream
在下面的代码中),它会发射可观测量(subjOne
和subjTwo
)。每个内部观察者可以随时以任何顺序发出自己的值。我的任务是从subjOne捕获值,直到subjTwo发出其第一个值。Javascript observables:需要switchMap的功能,但有轻微的差异
const subjOne = new Subject();
const subjTwo = new Subject();
const stream = Observable.create(observer => {
observer.next(subjOne);
observer.next(subjTwo);
});
stream
.someOperator(subj => subj)
.subscribe(value => console.log('Value: ', value));
实施例1: subjOne
发射值1和2,然后subjTwo
发射值3,则subjOne
发射4. 输出应为:1,2,3
实施例2: subjTwo
发出1,然后subjOne
发出2. 输出应该是1.
switchMap不适合这里,因为它从subjOne
尽快删除值从stream
发射。任何关于如何实现这一点的想法?谢谢。
UPDATE:在我的实际情况,不但有两个内部观测 - subjOne
和subjTwo
- 但他们络绎不绝,人工手动硬编码subjOne.takeUntil(subjTwo)
是不是一个可行的选择。
我的不好,我没有提到,在我的情况下,实际上有更多的内部可观察量不止两个。 –
那么你如何确定哪个流依赖于哪个流? – paulpdaniels
它应该从任何内部可观测值中吐出值,直到一些更新的内部可观测值(称为X)发出其第一个值。此时,它应该忽略来自旧观测值的所有未来值,并仅使用可观测X值。与原始问题中所述相同,但有1000个观测值,而不是两个。所以如果我正确地理解你的问题,那就是重要的那些内部观察者出现的顺序。 –