这里是一个刺来回答我的问题如果您发现任何问题,请告诉我。
public static class ObservableHelper
{
/// <summary>
/// Buffers entries that do no satisfy the <paramref name="shouldFlush"/> condition, using a circular buffer with a max
/// capacity. When an entry that satisfies the condition ocurrs, then it flushes the circular buffer and the new entry,
/// and starts buffering again.
/// </summary>
/// <typeparam name="T">The type of entry.</typeparam>
/// <param name="stream">The original stream of events.</param>
/// <param name="shouldFlush">The condition that defines whether the item and the buffered entries are flushed.</param>
/// <param name="bufferSize">The buffer size for accumulated entries.</param>
/// <returns>An observable that has this filtering capability.</returns>
public static IObservable<T> FlushOnTrigger<T>(this IObservable<T> stream, Func<T, bool> shouldFlush, int bufferSize)
{
if (stream == null) throw new ArgumentNullException("stream");
if (shouldFlush == null) throw new ArgumentNullException("shouldFlush");
if (bufferSize < 1) throw new ArgumentOutOfRangeException("bufferSize");
return System.Reactive.Linq.Observable.Create<T>(observer =>
{
var buffer = new CircularBuffer<T>(bufferSize);
var subscription = stream.Subscribe(
newItem =>
{
bool result;
try
{
result = shouldFlush(newItem);
}
catch (Exception ex)
{
return;
}
if (result)
{
foreach (var buffered in buffer.TakeAll())
{
observer.OnNext(buffered);
}
observer.OnNext(newItem);
}
else
{
buffer.Add(newItem);
}
},
observer.OnError,
observer.OnCompleted);
return subscription;
});
}
}
顺便说一下,CircularBuffer不是开箱即用,但实现很简单。
然后我就叫:
data
.FlushOnTrigger(item => item == 'X', bufferSize: 2)
.Subscribe(Console.WriteLine);
除了论证检查之外,我只看到一个主要的疏漏。您不会传递OnError或OnCompleted通知。 (通过'observer.OnError'和'observer.OnCompleted'到'Subscribe'调用你应该在这里工作。)我怀疑'Synchronize'调用总是需要的,并且通常会留给调用者来插入时需要。与问题的RX部分无关,但我期望'CircularBuffer'直接实现'IEnumerable',而不需要'ReadAll'方法。 – 2013-03-14 04:38:00
这是什么'Synchronize()'运算符? – AlexFoxGill 2013-03-14 09:17:55
感谢您的反馈,我删除了Synchronize调用。有一个TakeAll方法的原因是,我想清楚(并强制),当我读取整个缓冲区时,它会自动清空(而不是枚举项目,然后清除缓冲区) – 2013-08-23 20:49:20