2012-07-01 52 views
6

我有一堆事件进来,我必须毫不犹豫地执行所有这些事件,但我想确保它们在适当的时间段被缓冲和使用。任何人都有解决方案?什么是使用Observable“限制”消费的最佳方式?

我找不到任何的Rx运营商可以做,没有事件的损失(油门 - 失去的事件)。我也考虑过缓冲,延迟等等......找不到一个好的解决方案。

我试图把一个计时器在中间,但不知何故,没有在所有的工作:

GetInitSequence() 
      .IntervalThrottle(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime) 
    { 
     return Observable.Create<T>(o => 
      { 
       return source.Subscribe(x => 
        { 
         new Timer(state => 
          o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1)); 
        }, o.OnError, o.OnCompleted); 
     }); 
    } 
+0

你可以添加一个大理石运营商digram显示你有什么,以及你想要什么?像其他人一样,我不确定你想要实现什么,因为我认为缓冲区就好。 –

+0

你是什么限制? – Fredrick

回答

10

的问题是不是100%清楚,所以我正在做一些假设。

Observable.Delay是不是你想要的,因为这将创建一个从当每个事件到达,而不是为处理创建等时间间隔的延迟。

Observable.Buffer是不是你想要的,因为这将导致在每个给定的时间间隔的所有事件将被传递给你,而不是一次一个。

因此,我相信你正在寻找一种解决方案,创建某种类型的节拍器剔除,并给你一个事件每个滴答。这可能是天真使用Observable.Interval节拍器和Zip它连接到您的源构成:

var source = GetInitSequence(); 
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));  
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now)); 

这将触发每5秒(在上面的例子),并给你在序列中的原始项目。

这个解决方案的唯一问题是,如果你没有更多的源元素(比如说)10秒,当源元素到达时,他们会立即发送出去,因为一些'触发器'事件正在在那里等着他们。该场景的大理石图:

source: -a-b-c----------------------d-e-f-g 
trigger: ----o----o----o----o----o----o----o 
result: ----a----b----c-------------d-e-f-g 

这是一个非常合理的问题。有两个问题在这里已经是解决它:

Rx IObservable buffering to smooth out bursts of events

A way to push buffered events in even intervals

提供的解决方案主要Drain扩展方法和二次Buffered扩展。我已经修改了这些更简单(不需要Drain,只需使用Concat)。用法是:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5)); 

扩展方法StepInterval

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) 
{ 
    return source.Select(x => 
     Observable.Empty<T>() 
      .Delay(minDelay) 
      .StartWith(x) 
    ).Concat(); 
} 
+0

谢谢。仍然不是我一直在寻找的东西,但它向我展示了一些想法。我只是对Rx感到沮丧 - 为什么它应该如此复杂并且没有适当的文档。学习曲线陡峭,需要对主题有广泛的了解才能获得有价值的东西。 #fail – IgorM

+1

同意。这就是为什么我花了很多时间写IntroToRx.com来帮助你的职位。这很难,而且有很多东西需要学习。 –

+0

我真的发现这些Rx操作员很难阅读和推理。我认为这是我的局限 - 这可能是因为我有一个视觉的头脑,我无法想象结果。是否有机会获得这个答案中的代码的大理石图? –

0

如何Observable.Buffer?这应该将1s窗口中的所有事件作为单个事件返回。

var xs = Observable.Interval(TimeSpan.FromMilliseconds(100)); 
var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5)); 
bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); }); 

这可能是你所问的不清楚。你的代码应该做什么?看起来你只是通过为每个事件创建一个计时器来延迟。它也打破了观察者的语义,因为下一个完成可能会发生。

注意这也只是在使用的计时器准确。典型的定时器精确到16ms。

编辑:

你的榜样变得和项目包含窗口中的所有事件:

GetInitSequence() 
      .Buffer(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 
1

我知道这可能只是太简单,但将这项工作?

var intervaled = source.Do(x => { Thread.Sleep(100); }); 

基本上这只是在值之间放置一个最小延迟。太简单了?

+0

这匹配和修复了OP的行为IntervalThrottle是否真的明智? –

+1

Eeek ...阻止线程!?面对Rx校长,这种情况是对的? –

+0

是的,从它最纯粹的意义上说,这是违反Rx的,但要求是阻止 –

1

随着Enigmativity的答案的线,如果你想要做的只是延迟全部由一个时间跨度值的,我不能明白为什么Delay是不是你想要

GetInitSequence() 
     .Delay(TimeSpan.FromSeconds(5)) //ideally pass an IScheduler here 
     .Subscribe(
      item => 
       { 
        Console.WriteLine(DateTime.Now); 
        // Process item 
       } 
     ); 
相关问题