假设我有一个服务:拍摄快照的的IObservable <T>
public interface ICustomersService
{
IObservable<ICustomer> Customers
{
get;
}
}
的Customers
财产开始通过抢占了所有现有客户,并把它们传递到观察员,执行后,它只能通过对客户说稍后会添加到系统中。因此,它永远不会完成。
现在假设我想抓住当前客户的快照(作为List<ICustomer>
),忽略将来可能添加的任何内容。我怎么做?任何ToList()
或其亲属的调用将永远阻止,因为序列永远不会完成。
我想我可以写我自己的扩展,所以我想这:
public static class RxExtensions
{
public static List<T> ToSnapshot<T>(this IObservable<T> @this)
{
var list = new List<T>();
using (@this.Subscribe(x => list.Add(x)));
return list;
}
}
这似乎工作。例如:
var customers = new ReplaySubject<string>();
// snapshot has nothing in it
var snapshot1 = customers.ToSnapshot();
customers.OnNext("A");
customers.OnNext("B");
// snapshot has just the two customers in it
var snapshot2 = customers.ToSnapshot();
customers.OnNext("C");
// snapshot has three customers in it
var snapshot3 = customers.ToSnapshot();
我意识到目前的实现依赖于调度是当前线程,否则在收到项目之前ToSnapshot
可能会关闭其订阅。不过,我怀疑我也可以包含一个ToSnapshot
覆盖,它需要IScheduler
并确保在结束快照之前接收到计划在那里的任何项目。
我无法在Rx中找到这种快照功能。我错过了什么吗?
假设“快照”仅仅是枚举当前成员所产生的枚举......然后我无法想象快照例程会做什么ToList()不会做。你能详细说明一下吗? –
他的'Snapshot'扩展程序立即处理订阅,仅收到'Subscribe'方法返回之前产生的通知。 –
这实际上比这更多。对'IObservable'使用'ToList()'将直到'IObservable '完成才会完全没有任何问题。由于我的'IObservable'永远不会完成,因此只需调用'ToList()'就会无限期地阻塞。 –