2017-09-26 146 views
2

我想要消耗一个可随时填充的IObservable。从顶部排出IObservable

我有这样的扩展方法:

public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
     Func<TSource, IObservable<TOut>> selector) 
{ 
    return Observable.Defer(() => 
    { 
     BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); 

     return source 
      .Zip(queue, (v, q) => v) 
      .SelectMany(v => selector(v) 
       .Do(_ => 
       { 

       },() => 
       { 
        queue.OnNext(new Unit()); 
       }) 
      ); 
    }); 
} 

我像如下使用:

_moviesToTranslateObservable = new Subject<IMovie>(); 
_moviesToTranslateObservable.Drain(s => Observable.Return(s).Delay(TimeSpan.FromMilliseconds(250))) 
    .Subscribe(async movieToTranslate => 
     { 
     } 

一旦新项目推:

_moviesToTranslateObservable.OnNext(movieToTranslate); 

的的IObservable是消耗。

我的问题是,当我添加很多项目时,我不想消耗第一个已添加的,但最后添加的(如堆栈,而不是队列)。

我该如何做到这一点? BehaviorSubject是否适合堆栈消耗行为?

回答

2

我知道变量名称说queue,但那BehaviorSubject不是一个真正的队列,它更像是一个锁。排队真的发生在Zip函数中,该函数携带一个内部队列。至于FIFO和LIFO之间的切换,我不确定你想要什么标准,但这里有一个FIFO版本Drain

public static IObservable<TOut> DrainReverse<TSource, TOut>(this IObservable<TSource> source, 
     Func<TSource, IObservable<TOut>> selector) 
{ 
    return Observable.Defer(() => 
    { 
     BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); 
     var stack = new Stack<TSource>(); 

     return source 
      .Do(item => stack.Push(item)) 
      .Zip(queue, (v, q) => v) 
      .Select(_ => stack.Pop()) 
      .SelectMany(v => selector(v) 
       .Do(_ => 
       { 

       },() => 
       { 
        queue.OnNext(new Unit()); 
       }) 
      ); 
    }); 
} 

当与以下运行的代码中使用:

var s = new Subject<int>(); 
var d = s.DrainReverse(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(250))); 
d.Subscribe(i => Console.WriteLine(i)); 
s.OnNext(0); 
s.OnNext(1); 
s.OnNext(2); 
s.OnNext(3); 
s.OnNext(4); 
s.OnNext(5); 

哪个正确产生0, 5, 4, 3, 2, 1

+0

正是我一直在寻找!非常感谢 – Ben