我想你误解了Rx的一些问题。可观察对象没有过滤器,而且您不能“过滤”添加和移除过滤器。基于订阅者的观察数据也不可观察。
而是建立一个呼叫链。您从源观察开始,如addRecord
中的一个和removeRecord
事件中的一个。然后,您将这些可观测量链接到Rx中的各种operators形成新的观测值,最终您订阅最终可观测值。订阅将激活整个链条,并且当源事件触发时,所有操作员将触发并最终该事件(如果未被过滤)达到subscribe
。
你实际上可以做你在Rx中描述的东西。使用switchMap
这个可以让你将一个序列投影到另一个序列并每次切换到新序列的操作符都可以相对简单地更改可观察对象上的过滤器。例如filterSource.switchMap(filterFunction => Obs-1.filter(filterFunction))
。即使比这更简单,您可以退订第一次订阅并重新设置Rx链。然而,使用构建函数会使等式中出现很多杂耍状态。
但是,我强烈怀疑你并不需要这种复杂的行为。你想要的东西可以像这样存档:
var Src-1 = fromEvent(dataSource, 'addRecord') // create the first source
var Src-2 = fromEvent(dataSource, 'removeRecord') // and the other source
var Obs-1 = Src-1.combineLatest(Src-2) // combine both sources
.filter(e => someCondition(e)) // filter the source
var Obs-2 = Obs-1.mergeMap(e => someOtherCondition(e) ? Change(e) : Rx.Observable.of(e)) // on someOtherCondition, either transform the source with the `Change(e)` function. Or keep it unchanged with `of(e)`
var Obs-3 = Obs-2.filter(e => anotherCondition(e)) // Filter again
var sub = Obs-3.subscribe() // activate the sequence.
感谢您纠正我。每个观察者(OBS-1,OBS-2等)都可以有自己的订阅,它将接收数据。实际上观察将被动态添加。我们如何做到这一点? – user3130446
你如何动态接收observables?你不是指订阅吗?我认为你需要更具体地了解你想要的以及你的来源是什么导致了这些变化。 Rx是关于反应式程序设计的,在反应式程序设计中,了解数据来自哪里,而不是如何操作它很重要。一旦你的信息来源清晰,其余的就会落实到位,现在你似乎在寻找错误的解决方案。 – Dorus