2013-03-12 107 views
3

我在C#中使用Reactive Extensions(Rx)并希望以下面的方式过滤事件。想象一下,我有以下的鼻祖序列:Rx:如何缓冲事件(环形缓冲区)并且只在发生特殊事件时才刷新它们?

ABCDEF X GHI XĴXX KLMNO X P

我想产生以下的输出:

EF X HI X J XX NO X

基本上,我会缓冲(油门?)事件与最大界限(在这个例子中这个界限是2),当我得到某个事件(在这种情况下事件X),我想刷新缓冲区输出并重新开始缓冲,直到再次看到特殊事件。

我正在尝试一些方法,但没有任何运气,我想应该有一个简单的方法来完成它,我失踪了。编辑:一个约束,是我期望得到TONS被抛弃的事件,并且只有X的几个实例,所以在内存中保留一个具有数千个事件的缓冲区,只读取最后2个(或20个)事件,并不是一个真正的选择。

回答

0

这里是一个刺来回答我的问题如果您发现任何问题,请告诉我。

public static class ObservableHelper 
{ 
    /// <summary> 
    /// Buffers entries that do no satisfy the <paramref name="shouldFlush"/> condition, using a circular buffer with a max 
    /// capacity. When an entry that satisfies the condition ocurrs, then it flushes the circular buffer and the new entry, 
    /// and starts buffering again. 
    /// </summary> 
    /// <typeparam name="T">The type of entry.</typeparam> 
    /// <param name="stream">The original stream of events.</param> 
    /// <param name="shouldFlush">The condition that defines whether the item and the buffered entries are flushed.</param> 
    /// <param name="bufferSize">The buffer size for accumulated entries.</param> 
    /// <returns>An observable that has this filtering capability.</returns> 
    public static IObservable<T> FlushOnTrigger<T>(this IObservable<T> stream, Func<T, bool> shouldFlush, int bufferSize) 
    { 
     if (stream == null) throw new ArgumentNullException("stream"); 
     if (shouldFlush == null) throw new ArgumentNullException("shouldFlush"); 
     if (bufferSize < 1) throw new ArgumentOutOfRangeException("bufferSize"); 

     return System.Reactive.Linq.Observable.Create<T>(observer => 
     { 
      var buffer = new CircularBuffer<T>(bufferSize); 
      var subscription = stream.Subscribe(
       newItem => 
        { 
         bool result; 
         try 
         { 
          result = shouldFlush(newItem); 
         } 
         catch (Exception ex) 
         { 
          return; 
         } 

         if (result) 
         { 
          foreach (var buffered in buffer.TakeAll()) 
          { 
           observer.OnNext(buffered); 
          } 

          observer.OnNext(newItem); 
         } 
         else 
         { 
          buffer.Add(newItem); 
         } 
        }, 
       observer.OnError, 
       observer.OnCompleted); 

      return subscription; 
     }); 
    } 
} 

顺便说一下,CircularBuffer不是开箱即用,但实现很简单。

然后我就叫:

 data 
      .FlushOnTrigger(item => item == 'X', bufferSize: 2) 
      .Subscribe(Console.WriteLine); 
+0

除了论证检查之外,我只看到一个主要的疏漏。您不会传递OnError或OnCompleted通知。 (通过'observer.OnError'和'observer.OnCompleted'到'Subscribe'调用你应该在这里工作。)我怀疑'Synchronize'调用总是需要的,并且通常会留给调用者来插入时需要。与问题的RX部分无关,但我期望'CircularBuffer'直接实现'IEnumerable',而不需要'ReadAll'方法。 – 2013-03-14 04:38:00

+1

这是什么'Synchronize()'运算符? – AlexFoxGill 2013-03-14 09:17:55

+0

感谢您的反馈,我删除了Synchronize调用。有一个TakeAll方法的原因是,我想清楚(并强制),当我读取整个缓冲区时,它会自动清空(而不是枚举项目,然后清除缓冲区) – 2013-08-23 20:49:20

1

为方便起见,我们需要以下两个扩展功能:

public static class Extensions 
{ 
    public static IObservable<IList<TSource>> BufferUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate) 
    { 
     var published = source.Publish().RefCount(); 
     return published.Buffer(() => published.Where(predicate)); 
    } 

    public static IEnumerable<TSource> TakeLast<TSource>(this IEnumerable<TSource> source, int count) 
    { 
     return source.Reverse().Take(count).Reverse(); 
    } 
} 

然后,我们解决问题,像这样:

source.BufferUntil(c => c == 'X') 
    .SelectMany(list => list.TakeLast(3)) 

输出:

E F X H I X J X X N O X 
+0

谢谢,它的工作,但在这里使用一个缓冲意味着,直到你得到一个X,你将有可能缓冲数千个事件,只是为了利用最新的。如果不需要的话,我想要一些甚至没有提及过去事件的东西。 – 2013-03-13 21:31:38

1

我会捎带上我在这里发布的另一个答案: Trouble Implementing a Sliding Window in Rx

最重要的一点是此扩展方法:

public static class Ext 
{ 
    public static IObservable<IList<T>> SlidingWindow<T>(
     this IObservable<T> src, 
     int windowSize) 
    { 
     var feed = src.Publish().RefCount();  
     // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list 
     return Observable.Zip(
     Enumerable.Range(0, windowSize) 
      .Select(skip => feed.Skip(skip)) 
      .ToArray()); 
    } 
} 

您可以正是如此使用:

void Main() 
{ 
    // A faked up source 
    var source = new Subject<char>(); 

    var bufferSize = 2; 
    Func<char, bool> eventTrigger = c => c == 'X'; 

    var query = source 
     .Publish() 
     .RefCount() 
     // Want one extra slot to detect the "event" 
     .SlidingWindow(bufferSize + 1) 
     .Where(window => eventTrigger(window.Last())) 
     .Select(buffer => buffer.ToObservable()) 
     .Switch(); 

    using(query.Subscribe(Console.WriteLine)) 
    { 
     source.OnNext('A'); 
     source.OnNext('B'); 
     source.OnNext('C'); 
     source.OnNext('D'); 
     source.OnNext('E'); 
     source.OnNext('F'); 
     source.OnNext('X'); 
     source.OnNext('G'); 
     source.OnNext('H'); 
     source.OnNext('I'); 
     source.OnNext('X'); 
     Console.ReadLine(); 
    }  
} 

输出:

E 
F 
X 
H 
I 
X