2013-08-03 53 views
5

我写了一些代码,将FileSystemWatcher的事件变成可观察的序列。这个Reactive Extensions代码是否会泄漏内存?

我的目标是两次拆分所有文件系统的变化,以分离流并限制它们。

例如,如果我有10个不同的文件在半秒内变化3次,我只会为每个文件得到一次通知。

虽然我担心的是GroupBy()运营商。为了这个工作,(我认为)它需要随着时间的推移不断建立组,并消耗少量的内存。

这会造成“泄漏”,如果是这样,我该如何防止它?

FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") { 
    EnableRaisingEvents = true, 
    NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size 
}; 

void Main() 
{ 
    var fileSystemEventStream = 
     Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs> 
      (
       _ => _watcher.Changed += _, 
       _ => _watcher.Changed -= _ 
      ) 
      .ObserveOn(ThreadPoolScheduler.Instance) 
      .SubscribeOn(ThreadPoolScheduler.Instance) 
      .GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath) 
      ; 

    var res = 
     from fileGroup in fileSystemEventStream 
     from file in fileGroup.Throttle(TimeSpan.FromSeconds(1)) 
     select file; 

    res.Subscribe(
     ReceiveFsFullPath, 
     exception => { 
      Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace); 
     }); 

    Console.Read(); 
} 

void ReceiveFsFullPath(string s){ 
    Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId); 
    Console.WriteLine(s); 
} 
+0

如果你相信c#garnage收集器,而你不应该这样做,那就不会有泄漏 –

+0

但是这个组织持续引用同一个文件。我明白C#是垃圾收集,但这并不意味着内存泄漏不会发生。 –

+0

您是否尝试过添加[SciTech的Memory Profiler](http://memprofiler.com/)等探查器来查看您是否在泄漏? –

回答

3

是的,对于每个新密钥,GroupBy创建一个Subject,并维护这些Subject的字典。你正在订阅每一个这些。所以这是一小部分内存,随着时间的推移,这些内存将不断增加,无法发布旧条目。当油门计时器到期时,您真正需要的是将钥匙移除。我想不出与内置运营商做到这一点的方法。所以你需要一个自定义操作符。这是一个刺伤。

public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay) 
{ 
    return Observable.Create(observer => 
    { 
     var notifications = new Subject<IObservable<T>>(); 
     var subscription = notifications.Merge().Subscribe(observer); 
     var d = new Dictionary<T, IObserver<T>>(); 
     var gate = new object(); 
     var sourceSubscription = new SingleAssignmentDisposable(); 
     var subscriptions = new CompositeDisposable(subscription, sourceSubscription); 
     sourceSubscription.Disposable = source.Subscribe(value => 
     { 
      IObserver<T> entry; 
      lock(gate) 
      { 
      if (d.TryGetValue(value, out entry)) 
      { 
       entry.OnNext(value); 
      } 
      else 
      { 
       var s = new Subject<T>(); 
       var o = s.Throttle(delay).FirstAsync().Do(() => 
       { 
       lock(gate) 
       { 
        d.Remove(value); 
       } 
       }); 
       notifications.OnNext(o); 
       d.Add(value, s); 
       s.OnNext(value); 
      } 
      } 
     }, observer.OnError, notifications.OnCompleted); 

     return subscriptions; 
    }); 
} 

... 
Observable.FromEventPattern(...) 
    .Select(e => e.EventArgs.FullPath) 
    .ThrottleDistinct(TimeSpan.FromSeconds(1)) 
    .Subscribe(...); 
+1

欣赏你花时间写作答案的时间。我在MSDN论坛上看到了这个线程。 http://social.msdn.microsoft.com/Forums/en-US/53a66a85-4ab3-4fe9-96c6-8b72cc034d0e/groupbyuntil-usage可能有一个到期组的大小最大为1的答案是?如果有内置机制,我想避免使用锁编写代码:) –

+0

是的。我不知道GroupByUntil!它的API有点尴尬,但这应该工作:'filenames.GroupByUntil(f => f,g => g.Throttle(delay))。SelectMany(g => g.LastAsync()); – Brandon

1

根据布兰登的回复,主题将会增长并且无法被回收*。我在这里泄漏的内存的主要关注点是你不捕获订阅!即

res.Subscribe(... 

必须

subscription = res.Subscribe(... 

更换,如果你不抓住订阅,你永远无法处置的认购,从而你永远不放开的事件处理程序,因此你可以“泄漏内存”。显然这是没用的,如果你不在某处实际处理订阅。

*那么,如果他们完成了,那么他们会自动处置,所以这将工作。当FileDeleted事件发生时您可以查看完成序列吗?

+0

我正在捕获订阅,我只是没有添加这些代码。我基本上已经将查询从我们的代码库中提取出来,并使LINQPad变得友好。即使如此,如果这个查询运行的应用程序的生命和订阅没有结束,那么我们有一个“泄漏”:) –

+0

感谢您的澄清。 GroupByUntil似乎是你正在寻找的东西(尽管你仍然没有解释你想在什么条件下发布序列) –

+1

根据定义的问题,你可以释放序列,只要它的空闲时间长于节流时间。在原始版本的代码和使用“GroupByUntil”的版本之间的输出序列中没有“可见的”区别,除非后者在空闲时释放资源。 – Brandon

相关问题