2010-09-16 20 views
3

肯定有人在那里解决这个问题。 想象一下,我有一个班级定期提出一个关于值变化的事件(例如PropertyChanged) 这个价值无非就是金钱。与Rx运行总和

现在,我想利用Rx,以便获得最近10分钟增加的总和。例如BufferWithTime没有帮助,因为我总是需要最后10分钟。

任何想法,我可以做到这一点?下面

TIA 马丁

回答

3

的解决方案涉及保持相关事件数据的状态,在过去的10分钟内使用Observable.Scan列表。 State被维护为一个元组列表,其值为int(货币)和DateTime

var events = Observable.FromEvent<YourEventArgs>(
    h => SomeEvent += h, h => SomeEvent -= h); 
var runningSums = 
    events.Scan(new List<Tuple<int, DateTime>>(), 
       (l, e) => 
       { 
        var now = DateTime.Now; 
        // Add last event data to list. 
        l.Add(Tuple.Create(e.EventArgs.Money, now)); 
        // Return the correct part of the list (everything 
        // from the last ten minutes). 
        return l.Where(t => (now - t.Item2) < 
            TimeSpan.FromMinutes(10)).ToList(); 
       }) 
      .Select(l => l.Sum(t => t.Item1)); 
runningSums.Subscribe(sum => Console.WriteLine(sum)); 

编辑:例子并不为每个事件返回一个新的列表:

var events = Observable.FromEvent<YourEventArgs>(
    h => SomeEvent += h, h => SomeEvent -= h); 
var runningSums = 
    events.Scan(Tuple.Create(new List<Tuple<int, DateTime>>(), 
          DateTime.Now - TimeSpan.FromMinutes(10)), 
       (l, e) => 
       { 
        var now = DateTime.Now; 
        l.Item1.Add(Tuple.Create(e.EventArgs.Nr, now)); 
        // if (trimming-condition) then trim front of list... 
        return Tuple.Create(l.Item1, now - TimeSpan.FromMinutes(10)); 
       }) 
      .Select(l => l.Item1.Where(t => t.Item2 > l.Item2).Sum(t => t.Item1)); 
runningSums.Subscribe(sum => Console.WriteLine(sum)); 
+0

不坏的肯定和做什么它应该。但是你可能想要调查是否用每个新元素处理返回一个新列表是一个好主意。 – pointernil 2010-09-16 18:04:55

+1

我也不完全满意。但是,您必须拥有包含时间戳的值列表才能执行此操作。尽管可变名单,这是一个完全有效的方法。你也可以做的不是每次创建一个新的列表,而是保持相同的列表,并使“Scan”方法的开始时间(过去总是10分钟)成为状态的一部分。然而,现在名单只有不断增长,所以你必须找到一种方法来修剪前线。我也为此添加了一个例子(没有修剪,但可以很容易地添加)。 – 2010-09-16 18:32:02

1

好了,看看下面的解决方案。它建立在先前提出的解决方案的基础上,但为了效率(和可读性,我认为),却放弃了纯粹的功能风格。它重用藏汉内置的Timestamped类型跟踪时间...

欢呼

public static class RxEntentsions 
     { 
      class TimeLimitedList<T> 
      { 
       public List<Timestamped<T>> Values = new List<Timestamped<T>>(); 
       TimeSpan span; 
       public TimeLimitedList(TimeSpan sp) { span = sp; } 
       public void Add(Timestamped<T> v) 
       { 
        Values.Add(v); 
        Values.RemoveAll(a => a.Timestamp < (DateTime.Now - span)); 
       } 
      } 

      public static IObservable<List<Timestamped<TSource>>> SlidingWindow<TSource>(this IObservable<Timestamped<TSource>> source, TimeSpan slidingWindow) 
      { 
       return source.Scan0(new TimeLimitedList<TSource>(slidingWindow), (acc, v) => { acc.Add(v); return acc; }).Select(a => a.Values); 
      } 
     } 


    static void Main(string[] args) 
    { 
     var gen = Observable.Interval(TimeSpan.FromSeconds(0.25d)).Timestamp(); 
     gen.SlidingWindow(TimeSpan.FromSeconds(1)).Subscribe(slw => {slw.ForEach(e=> Console.WriteLine(e)); Console.WriteLine("--------");}); 
     Console.ReadLine(); 
    }