2012-11-27 28 views
7

我有一个程序,我正在接收事件并希望分批处理它们,以便在处理当前批次时进入的所有项目都将出现在下一批次中。无效扩展:缓冲区直到用户空闲

Rx中简单的TimeSpan和基于计数的缓冲区方法会给我多批次的项目,而不是给我一大堆所有进来的项目(在订户需要的时间超过指定的TimeSpan或超过N项目进来,N大于计数)。

我看着使用更复杂的缓冲区过载,其中采取Func<IObservable<TBufferClosing>>IObservable<TBufferOpening> and Func<TBufferOpening, IObservable<TBufferClosing>>,但我找不到如何使用这些的例子,更不知道如何将它们应用于我想要做的。

+0

[本页](http://leecampbell.blogspot.com.au/2011/03/rx-part-9join-window-buffer-and-group.html)可能有助于缓冲区过载。整个系列非常有帮助 –

+0

您是否在TPL数据流中尝试过'BufferBlock'? – Asti

回答

1

这是做你想做的吗?

var xs = new Subject<int>(); 
var ys = new Subject<Unit>(); 

var zss = 
    xs.Buffer(ys); 

zss 
    .ObserveOn(Scheduler.Default) 
    .Subscribe(zs => 
    { 
     Thread.Sleep(1000); 
     Console.WriteLine(String.Join("-", zs)); 
     ys.OnNext(Unit.Default); 
    }); 

ys.OnNext(Unit.Default); 
xs.OnNext(1); 
Thread.Sleep(200); 
xs.OnNext(2); 
Thread.Sleep(600); 
xs.OnNext(3); 
Thread.Sleep(400); 
xs.OnNext(4); 
Thread.Sleep(300); 
xs.OnNext(5); 
Thread.Sleep(900); 
xs.OnNext(6); 
Thread.Sleep(100); 
xs.OnNext(7); 
Thread.Sleep(1000); 

我的结果:

1-2-3 
4-5 
6-7 
+0

删除ObserveOn使程序在一个线程上执行导致它中断。 –

+0

@ChrisEldredge - 是的。您可能必须允许多线程才能使其发挥作用。 – Enigmativity

+0

当队列为空时,它也会导致忙碌的等待(将CPU挂在100%处)。如果您在zs.Count == 0时删除睡眠,您将看到尖峰。 –

1

你需要什么东西来缓冲值,然后当工人 准备,它要求当前缓冲区,然后重置它。这可以 与RX和任务的组合

class TicTac<Stuff> { 

    private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>(); 

    List<Stuff> in = new List<Stuff>(); 

    public void push(Stuff stuff){ 
     lock(this){ 
      if(in == null){ 
       in = new List<Stuff>(); 
       Items.SetResult(in); 
      } 
      in.Add(stuff); 
     } 
    } 

    private void reset(){ 
     lock(this){ 
      Items = new TaskCompletionSource<List<Stuff>>(); 
      in = null; 
     } 
    } 

    public async Task<List<Stuff>> Items(){ 
     List<Stuff> list = await Items.Task; 
     reset(); 
     return list; 
    } 
} 

然后

var tictac = new TicTac<double>(); 

IObservable<double> source = .... 

source.Subscribe(x=>tictac.Push(x)); 

然后做你的工人

while(true){ 

    var items = await tictac.Items(); 

    Thread.Sleep(100); 

    for each (item in items){ 
     Console.WriteLine(item); 
    } 

} 
+1

我想这是可行的,但为什么要直接使用TPL/APM来使用Reactive Extensions? –

+0

ReactiveExtensions是一个框架,你可以随意扩展它,我经常为自定义的东西添加自己的操作符 – bradgonesurfing

+2

@ChrisEldredge公平地说,你所要求的不适合Rx框架;订户无法向观察者发出信号,表示它闲置或不在,你必须这样做。 – casperOne

1

之前,我已经做到了这一点的方法是拉起观察DotPeek/Reflector中的方法,并采用它具有的排队概念并使其适应我们的要求。例如,在具有快速滴答数据的UI应用程序(如财务)中,UI线程可能会充满事件,有时它不能足够快速地更新。在这些情况下,我们希望删除除最后一个以外的所有事件(针对特定仪器)。在这种情况下,我们将ObserveOn的内部队列更改为T的单个值(查找ObserveLatestOn(IScheduler))。在你的情况下,你想要队列,但是你想推动整个队列不只是第一个值。这应该让你开始。