2016-08-26 38 views
0

我有一个filewatcher,从中观察创建和更改的事件。 我希望当第一个事件被触发(创建或改变)时,它需要开始缓冲10秒,并在那10秒后我想处理缓冲的事件。触发第一个事件几秒钟后的RX缓冲区事件

我得已经是这样的:

Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Created") 
       .Merge(Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Changed")) 
       .Buffer(TimeSpan.FromSeconds(10)) 
       .Subscribe(list => 
       { 
        Debug.WriteLine("Do something"); 
       }); 

此代码 '的Debug.WriteLine( “做什么”);'每10秒钟一次。

编辑: 好吧,让我试着用时间线来解释它。

  1. 文件观察器空闲,没有事件触发。
  2. 一段未知的时间后的文件被放置在目录被触发
  3. 创建的事件
  4. 可观察名单开始,持续10秒
  5. 缓冲(所有事件)的10秒订阅操作后被执行,它会处理所有的事件一次

希望这会使事情清楚

+1

你目前的代码有什么问题?你为什么不使用'list'?或者你想单独处理列表中的每个项目? –

+0

你只想让它触发一次或什么?这听起来像你的解决方案是适当的。 – Shlomo

+0

在旁注中:您应该保留对订阅的引用,并在您不再对这些事件感兴趣时进行处置。如果您不处理订阅并且超出范围,那么它可能仍然对该事件保持强烈的引用(并且可能导致泄漏)。 –

回答

2

我假设你想要的以下行为:

  1. 初始事件发生后,缓冲所有事件在接下来的10秒。
  2. 一旦这个10秒的窗口关闭,下一个应该在10秒后为所有事件触发一个新的10秒缓冲区。

所以我们假设我们有5个事件在5秒内均匀分布,13秒间隔,然后在5秒内均匀分布另外5个事件。大理石图是这样的:

timeline: 0--1--2--3--4--5--6--7--8--9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27 
events : x--x--x--x--x-------------------------------------x--x--x--x--x------------------ 
stdbuff : |----------------------------|-----------------------------|--------------------- 
desired : BeginCapture-----------------Return---------------BeginCapture------------------Return 

使用直线前进Buffer的是,它看起来像上面谱写的stdbuff,而分手事件的第二组分为两组,造成两人列出了问题第二组活动:一个有三个活动,一个有两个活动。您需要一个列表(针对第二组),使用类似desired流的逻辑。开始在0捕获,返回列表在10.开始在17捕获,在27返回列表。

如果我误解了你(再次),请张贴大理石图表,类似于上面,代表你想要如何事情要工作。


假设我理解你正确,下面的代码将工作...

//var initialSource = Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Created)) 
// .Merge(Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Changed))); 

    //Comment this out, and use the above lines for your code. This just makes testing the Rx components much easier. 
var initialSource = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5) 
    .Concat(Observable.Empty<long>().Delay(TimeSpan.FromSeconds(13))) 
    .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5)); 

initialSource 
    .Publish(_source => _source 
     .Buffer(_source 
      .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary) 
      .DistinctUntilChanged() 
      .Delay(TimeSpan.FromSeconds(10)) 
     ) 
    ) 
    .Subscribe(list => 
    { 
     Debug.WriteLine($"Time-stamp: {DateTime.Now.ToLongTimeString()}"); 
     Debug.WriteLine($"List Count: {list.Count}"); 
    }); 

说明

首先,我们需要确定 '主要事件',那些代表BeginCapture上面的desired流描述中的注释。可以发现这样的:

var primaryEvents = initialSource 
     .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary) 
     .DistinctUntilChanged(); 

一旦我们有了BeginCapture事件,它可以表示一个窗口开放,这是很容易找到Return事件,或闭窗:

var closeEvents = primaryEvents.Delay(TimeSpan.FromSeconds(10)); 

在实践中,因为没有密切和开放的,我们关心之间发生的,我们只需要担心关闭事件,所以我们可以把它收缩到这一点:

var closeEvents = initialSource 
     .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary) 
     .DistinctUntilChanged() 
     .Delay(TimeSpan.FromSeconds(10)); 

把它插入到BuffercloseEvents作为bufferBoundaries

var bufferredLists = initialSource 
    .Buffer(initialsource 
     .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary) 
     .DistinctUntilChanged() 
     .Delay(TimeSpan.FromSeconds(10)) 
    ); 

最后,因为我们有多个订阅initialSource,我们需要使用Publish确保并发正常工作,导致最终的答案。

+1

请写出您希望如何工作的时间表。 – Shlomo

+0

Rx中的Windows和缓冲区很棘手。 – Shlomo

+0

这确实有效。我认为会有一个更简单的解决方案。阅读这里发生的事情有点难,这并不能真正改善这个项目的维护。我不是你看到的唯一一个正在工作的人。 –

相关问题