2017-10-28 167 views
0

我遇到了RX.net的背压问题,我找不到解决方案。我有一个可观察的实时日志消息流。RX.net的DropQueue机制

var logObservable = /* Observable stream of log messages */ 

,我想通过他们正在通过网络发送之前,该串行从logObservable实时日志消息的TCP接口暴露。所以,我做到以下几点:

foreach (var message in logObservable.ToEnumerable()) 
{ 
    // 1. Serialize message 
    // 2. Send it over the wire. 
} 

.ToEnumerable()问题出现,如果回压的情况发生例如如果另一端的客户端暂停流。问题是.ToEnumerable()缓存导致大量内存使用的项目。我正在寻找一种类似于DropQueue的机制,它只缓冲最后10条消息,例如

var observableStream = logObservable.DropQueue(10).ToEnumerable(); 

这是解决此问题的正确途径吗?你知道实施这样的机制,以避免可能的背压问题?

+0

'。取(10).toenumerable()'会的工作不是吗? –

+0

我想通过电线连续传输日志消息。如果我按照你的建议做,它不仅需要10条日志消息,然后完成可观察流?我试图解决的问题是,如果客户端速度太慢而无法检索日志消息或暂停流,它应该只缓存10件物品,而不是无数的物品。 – SOK

+0

'.Throttle(...)'或'.Sample(..)'? – Enigmativity

回答

0

DropQueue实现:

public static IEnumerable<TSource> ToDropQueue<TSource>(
     this IObservable<TSource> source, 
     int queueSize, 
     Action backPressureNotification = null, 
     CancellationToken token = default(CancellationToken)) 
    { 
     var queue = new BlockingCollection<TSource>(new ConcurrentQueue<TSource>(), queueSize); 
     var isBackPressureNotified = false; 

     var subscription = source.Subscribe(
      item => 
      { 
       var isBackPressure = queue.Count == queue.BoundedCapacity; 

       if (isBackPressure) 
       { 
        queue.Take(); // Dequeue an item to make space for the next one 

        // Fire back-pressure notification if defined 
        if (!isBackPressureNotified && backPressureNotification != null) 
        { 
         backPressureNotification(); 
         isBackPressureNotified = true; 
        } 
       } 
       else 
       { 
        isBackPressureNotified = false; 
       } 

       queue.Add(item); 
      }, 
      exception => queue.CompleteAdding(), 
      () => queue.CompleteAdding()); 

     token.Register(() => { subscription.Dispose(); }); 

     using (new CompositeDisposable(subscription, queue)) 
     { 
      foreach (var item in queue.GetConsumingEnumerable()) 
      { 
       yield return item; 
      } 
     } 
    }