2014-01-09 99 views
3

假设我有一个服务:拍摄快照的的IObservable <T>

public interface ICustomersService 
{ 
    IObservable<ICustomer> Customers 
    { 
     get; 
    } 
} 

Customers财产开始通过抢占了所有现有客户,并把它们传递到观察员,执行后,它只能通过对客户说稍后会添加到系统中。因此,它永远不会完成。

现在假设我想抓住当前客户的快照(作为List<ICustomer>),忽略将来可能添加的任何内容。我怎么做?任何ToList()或其亲属的调用将永远阻止,因为序列永远不会完成。

我想我可以写我自己的扩展,所以我想这:

public static class RxExtensions 
{ 
    public static List<T> ToSnapshot<T>(this IObservable<T> @this) 
    { 
     var list = new List<T>(); 

     using (@this.Subscribe(x => list.Add(x))); 

     return list; 
    } 
} 

这似乎工作。例如:

var customers = new ReplaySubject<string>(); 

// snapshot has nothing in it 
var snapshot1 = customers.ToSnapshot(); 

customers.OnNext("A"); 
customers.OnNext("B"); 

// snapshot has just the two customers in it 
var snapshot2 = customers.ToSnapshot(); 

customers.OnNext("C"); 

// snapshot has three customers in it 
var snapshot3 = customers.ToSnapshot(); 

我意识到目前的实现依赖于调度是当前线程,否则在收到项目之前ToSnapshot可能会关闭其订阅。不过,我怀疑我也可以包含一个ToSnapshot覆盖,它需要IScheduler并确保在结束快照之前接收到计划在那里的任何项目。

我无法在Rx中找到这种快照功能。我错过了什么吗?

+0

假设“快照”仅仅是枚举当前成员所产生的枚举......然后我无法想象快照例程会做什么ToList()不会做。你能详细说明一下吗? –

+0

他的'Snapshot'扩展程序立即处理订阅,仅收到'Subscribe'方法返回之前产生的通知。 –

+0

这实际上比这更多。对'IObservable '使用'ToList()'将直到'IObservable '完成才会完全没有任何问题。由于我的'IObservable'永远不会完成,因此只需调用'ToList()'就会无限期地阻塞。 –

回答

0

你在这里所做的实际上很漂亮。 ToSnapshot的工作原因是因为在释放控制流之前,您的订阅逻辑的底层实现是将所有客户交给观察者。基本上,Dispose仅在控制流被释放后才被调用,并且控制流只在获得所有预先存在的联系后才被释放。

虽然这是,这也是一个误导。你写的方法,ToSnapshot,应该被命名为TakeSyncronousNotifications。该扩展正在对潜在观察值的工作方式做出重大假设,并且不符合Rx的精神。

为了让消费者更容易理解,我将公开显示声明返回的其他属性。

public interface ICustomersService 
{ 
    IEnumerable<ICustomer> ExistingCustomers { get; } 
    IObservable<ICustomer> NewCustomers { get; } 
    IObservable<ICustomer> Customers { get; } 
} 

public class CustomerService : ICustomerService 
{ 
    public IEnumerable<ICustomer> ExistingCustomers { get { ... } } 
    public IObservable<ICustomer> NewCustomers { get { ... } } 
    public IObservable<ICustomer> Customers 
    { 
     get 
     { 
      return this.ExistingCustomers.ToObservable().Concat(this.NewCustomers); 
     } 
    } 
} 

编辑:

考虑以下问题...

50 = x + y。解决并评估x

数学只是不工作,除非你知道y是什么。在这个例子中,y是“新客户”,x是“现有客户”,而50是两者的组合。

通过仅公开现有客户和新客户的组合,而不是现有客户和新客户自己,您已经丢失了太多数据。您需要向消费者展示至少xy,否则无法解决其他问题。

+0

感谢您的输入。你在Rx中没有提到可以做我想做的事情。在这种情况下,我用Rx咆哮错误的树吗? –

+0

你试图一次做太多事情。 Rx非常适合新客户部分。现有客户应该公开为IEnumerable,并且可以使用上面给出的代码公开两者的组合。 –

1

你可以尝试使用超时您观察到的

source.Customers().TakeUntil(DateTime.Now).ToEnumerable(); 
+0

谢谢,但这太难以预料了。我正在寻找具有明确定义的语义的东西。此外,在枚举上调用“ToList”只会导致超时异常(奇怪)。 –

+0

抱歉超时是错误的。我会解决它。 – bradgonesurfing

+0

修复了使用TakeUntil(DateTime.Now)的代码。超时异常是您使用Timeout时应该期待的。我的错误。 – bradgonesurfing

1

有几种方法可以解决这个。我在商业项目中尝试了以下成功案例:

1)一种单独的方法,可以像克里斯演示的那样获得当前客户的可枚举数。

2)将“世界状况”调用与直播流相结合的方法 - 这比Chris的示例更有意义,因为为了确保没有错过的数据,通常必须首先开始直播流,然后获取快照,然后将它们与去重复组合。

我使用定制的Observable.Create实现实现了这一点,该实现高速缓存直播流直到检索历史记录,然后在切换到现场之前将高速缓存与历史记录合并。

返回的客户,但包含描述数据的年龄的额外的元数据。

3)最近,对我来说返回IObservable<IEnumerable<Customer>>更有用,其中第一个事件是整个世界状态。这个更有用的原因是我工作的很多系统都批量更新,而且整个批次更新UI的速度往往比逐项更快。除此之外,它与(2)类似,只不过您可以使用FirstAsync()来获取所需的快照。

我建议你考虑这种方法。如果需要,您可以随时使用SelectMany(x => x)IObservable<IEnumerable<Customer>>的数据流平整为IObservable<Customer>

当我回到家庭办公室时,我会看看是否可以挖掘出一个示例实施!

+0

选项2只适用于冷观察对象。否则,你有可能错过第一个“n”个参数。 –