前段时间我也遇到了同样的问题,而且我没有找到一个内置的操作符来完成这个操作。所以我写了我自己的,我叫Latest
。不是微不足道的实施,而是发现它在我目前的项目中非常有用。
它的工作原理如下:当观察者正忙于处理先前的通知(当然是在它自己的线程上)时,它会尽快将队列中的最后n个通知(n> = 0)和OnNext
作为观察者它变得空闲。所以:
Latest(0)
:只观察到达项目,而观察者处于闲置状态
Latest(1)
:始终遵守最新的
Latest(1000)
(EG):一般处理所有的项目,但如果东西卡住了线,而想念一些比得到一个OutOfMemoryException
Latest(int.MaxValue)
:不要错过任何一个项目,但负载平衡生产者和消费者之间。因此
您的代码将是:stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")
签名看起来是这样的:
/// <summary>
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
/// </summary>
/// <param name="source">The source sequence.</param>
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
/// <remarks>
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
/// </remarks>
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
实现太大,张贴在这里,但如果有人有兴趣,我会高兴地分享。让我知道。
做什么用'SlowFunction'比流慢的问题? –
@FyodorSoikin如果'SlowFunction'比流缓慢,新项目的发射速度比它们可以处理和打印的速度要快。因此,随着程序的运行,正在打印的新项目与正在打印的项目的“SlowFunction”输出之间的延迟/延迟会朝向无限增长。这是不可接受的,因为我需要实时监控数据。我只关心最新的项目。 – Steve
'SlowFunction'是否同步?还是Observable/Async? – Shlomo