2017-02-14 42 views
6

我需要实现Rx.NET如下算法:在Rx.NET处理背压不onBackpressureLatest

  1. stream就拿最新的项目,或等待新的项目,而不阻塞,如果没有新项目。只有最新的项目很重要,其他人可以放弃。请输入SlowFunction并打印输出。从步骤
  2. 重复1

天真的解决方案是:

let PrintLatestData (stream: IObservable<_>) = 
    stream.Select(SlowFunction).Subscribe(printfn "%A") 

然而,这种解决方案并不因为平均stream发出物品超过SlowFunction速度可以消耗他们的工作。由于Select不会删除项目,而是尝试按从最旧到最新的顺序处理每个项目,因此随着程序运行,发送和打印的项目之间的延迟将逐渐趋于无穷大。为避免这种无限增长的背压,只应从流中采取最新的项目。

我搜索了文档,在RxJava中找到了一个名为onBackpressureLatest的方法,这对我来说可以做到我上面所描述的。但是,该方法在Rx.NET中不存在。如何在Rx.NET中实现这一点?

+1

做什么用'SlowFunction'比流慢的问题? –

+0

@FyodorSoikin如果'SlowFunction'比流缓慢,新项目的发射速度比它们可以处理和打印的速度要快。因此,随着程序的运行,正在打印的新项目与正在打印的项目的“SlowFunction”输出之间的延迟/延迟会朝向无限增长。这是不可接受的,因为我需要实时监控数据。我只关心最新的项目。 – Steve

+1

'SlowFunction'是否同步?还是Observable/Async? – Shlomo

回答

7

我想你想用ObserveLatestOn之类的东西。它有效地用一个值和一个标志替换传入事件的队列。

詹姆斯世界已经在博客在这里http://www.zerobugbuild.com/?p=192

的概念是在倾斜信任的速度有多快,服务器可以在其推数据GUI应用程序大量使用。

您还可以看到在无交易https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 和支持演示的实现解释ReactiveTrader https://leecampbell.com/presentations/#ReactConfLondon2014

需要明确的是,这是一个甩负荷算法,而不是一个背压算法。

+0

'ObserveLatestOn'正是我所需要的,谢谢! – Steve

+1

虽然这可能在理论上回答这个问题,但[这将是更可取的](// meta.stackoverflow.com/q/8259)在这里包括答案的基本部分,并提供供参考的链接。 – Draken

+0

或者指出它可能是http://stackoverflow.com/questions/6384312/how-can-i-observe-values-in-a-non-blocking-way-using-rx和http:/ /stackoverflow.com/questions/11010602/with-rx-how-do-i-ignore-all-except-the-latest-value-when-my-subscribe-method-is/。我想这个答案是探测,我不确定他是否想要背压(按照标题)或卸载(根据说明) –

1

同步/异步建议可能会有所帮助,但是,由于慢速函数始终比事件流慢,因此异步可能允许您以((在线程池中观察)并行处理最终)只是用尽线程或通过上下文切换添加更多延迟。这听起来不像是我的解决方案。

我建议你看一下由Dave Sexton编写的开源Rxx'内省'操作符。这些可以改变最新的缓冲器/节流时间,因为队列由于消费缓慢而备份。如果慢速函数突然变得更快,它不会缓冲任何东西。如果变慢,它会缓冲更多。 您必须检查是否有'最新'类型,或者只是修改现有的以适应您的需求。例如。使用缓冲区,只需缓冲区中的最后一项,或进一步增强内部只存储最新的。 Google'Rxx',你会在Github的某处找到它。

如果“慢速功能”的时间相当可预测,则更简单的方法是简单地将流量限制在超过此时间的数量。显然,我并不是指标准的rx'油门',而是一个通过更新而不是旧的更新。这里有很多解决这类问题的方法。

1

前段时间我也遇到了同样的问题,而且我没有找到一个内置的操作符来完成这个操作。所以我写了我自己的,我叫Latest。不是微不足道的实施,而是发现它在我目前的项目中非常有用。

它的工作原理如下:当观察者正忙于处理先前的通知(当然是在它自己的线程上)时,它会尽快将队列中的最后n个通知(n> = 0)和OnNext作为观察者它变得空闲。所以:

  • Latest(0):只观察到达项目,而观察者处于闲置状态
  • Latest(1):始终遵守最新的
  • Latest(1000)(EG):一般处理所有的项目,但如果东西卡住了线,而想念一些比得到一个OutOfMemoryException
  • Latest(int.MaxValue):不要错过任何一个项目,但负载平衡生产者和消费者之间。因此

您的代码将是:stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")

签名看起来是这样的:

/// <summary> 
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process. 
/// </summary> 
/// <param name="source">The source sequence.</param> 
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param> 
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param> 
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> 
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception> 
/// <remarks> 
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready. 
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any. 
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>. 
/// </remarks> 
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null) 

实现太大,张贴在这里,但如果有人有兴趣,我会高兴地分享。让我知道。

1

你可以在sample流中间隔你知道SlowFunction可以处理。以下是一个Java的例子:

TestScheduler ts = new TestScheduler(); 

Observable<Long> stream = Observable.interval(1, TimeUnit.MILLISECONDS, ts).take(500); 
stream.sample(100, TimeUnit.MILLISECONDS, ts).subscribe(System.out::println); 

ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS); 
98 
198 
298 
398 
498 
499 

sample不会导致反压始终抓住流中的最新值,所以它符合您的要求。另外sample将不会发送两次相同的值(可以从上面看到,因为499只打印一次)

我认为这将是一个有效的C#/F#解决方案:

static IDisposable PrintLatestData<T>(IObservable<T> stream) { 
    return stream.Sample(TimeSpan.FromMilliseconds(100)) 
     .Select(SlowFunction) 
     .Subscribe(Console.WriteLine); 
} 
let PrintLatestData (stream: IObservable<_>) = 
    stream.Sample(TimeSpan.FromMilliseconds(100)) 
     .Select(SlowFunction) 
     .Subscribe(printfn "%A") 
+1

这是一个非常简单的解决方案,但在我的特殊用例SlowFunction的运行时间变化太大,这种方法的工作。感谢分享。 – Steve