订阅

2012-06-26 148 views
0

的处置时访问当前的窗口,我有以下代码:订阅

var observable = ... subscribe to event here ... 

var windows = observable.Window(TimeSpan.FromSeconds(240)); 

aggregatedWindows = windows.SelectMany(
    window => window.Aggregate(new Context(), AggregateContext)); 

subscription = aggregatedWindows.Subscribe(OnWindow); 

... later 

subscription.Dispose(); 

试想一个场景,当我在处理窗口,有人已要求我的应用程序应该关闭的中间。我要处置这个订阅,这将停止正在处理的事件,但是我也将失去最后一个窗口的信息。

我不知道什么是最好的方式来处理,这是...

,因为它是通过聚合函数传递我可以存储本地状态与上看到窗口(但这似乎是错误的)。 ..

任何帮助将不胜感激!

回答

0

您可以对窗口进行操作,而不是保留聚合订阅 - 这是您最初希望保持连接到最后一个窗口的时间,并在超时时使用超时断开连接划分。

这里使用了一个单独的类,因为使用Create使它成为自动分离 - 在处理调用完成后立即断开观察者。因此,从根本上来说,Dispose的含义就是这里所做的改变。

public static IObservable<T> DeferDisconnection<T>(this IObservable<T> observable, TimeSpan timeout) 
    { 
     return new ClosingObservable<T>(observable, timeout); 
    } 


    public class ClosingObservable<T> : IObservable<T> 
    { 

     private readonly IConnectableObservable<T> Source; 
     private readonly IDisposable Subscription; 
     private readonly TimeSpan Timeout; 

     public ClosingObservable(IObservable<T> observable, TimeSpan timeout) 
     { 
      Timeout = timeout; 
      Source = observable.Publish(); 
      Subscription = Source.Connect(); 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      Source.Subscribe(observer); 

      return Disposable.Create(() => Source.Select(_ => new Unit()) 
               .Amb(Observable.Timer(Timeout).Select(_ => new Unit())) 
               .Subscribe(_ => Subscription.Dispose()) 
               ); 
     } 
    } 

测试:

  var disposable = 
      Observable.Interval(TimeSpan.FromSeconds(2)) 
         .Do(Console.WriteLine) 
         .DeferDisconnection(TimeSpan.FromSeconds(5)) 
         .Subscribe(); 

      Console.ReadLine(); 

      disposable.Dispose(); 

      Console.ReadLine(); 
+0

我很困惑如何解决这个问题。看起来DeferDisconnection在处理之前会稍微等一下,但是如果窗口很大,这会导致应用等待很长时间才能关闭。那是对的吗? – jonnii

+0

@jonnii这就是为什么有一个超时参数 - 它在下一个值到达时断开,或者它需要太长时间,这是超时参数。 – Asti

-1

这工作,进行部分窗口证实被显示在后面。

class Program 
{ 
    public class Context 
    { 
     public int count; 
    } 

    static Context AggregateContext(Context c, long i) 
    { 
     c.count++; 
     return c; 
    } 

    static void OnWindow(Context c) { Console.WriteLine(c.count); } 

    static void Main(string[] args) 
    { 
     var canceled = new Subject<bool>(); 

     var observable = Observable.Interval(TimeSpan.FromSeconds(.1)).TakeUntil(canceled); 

     var windows = observable.Window(TimeSpan.FromSeconds(3)); 

     var aggregatedWindows = windows.SelectMany(
      window => window.Aggregate(new Context(), AggregateContext)); 

     var subscription = aggregatedWindows.Subscribe(OnWindow); 

     Thread.Sleep(TimeSpan.FromSeconds(10)); 

     canceled.OnNext(true); 
     subscription.Dispose(); 

     Console.WriteLine(@"Output should have been something like 30,30,30,30,10"); 
     Console.ReadLine(); 
    } 
} 
+0

为什么这会降低投票率? – jonnii