2016-10-31 27 views
1

我需要如何分发聚集更新一个好主意......汇总观察到的数据分成多个桶

可以说,我有一个ID的的IObservable发送和接收消息的一个永无止境的流值(5- 10,000 /秒)。现在我想计算大量的聚合(例如总和) 以便定期分配给其他系统 - 让每个聚合每10秒钟说一次。 聚合基于元组的Id(字符串),但可能会落入多个聚合(聚合定义应包含哪些id - 因此会重叠)。

会有几千个聚合定义,所以有人有任何想法如何解决这个问题?

概念:

public struct Update 
{ 
    public string Id { get; } 

    public int Value { get; } 
} 

public class Aggregate 
{ 
    Dictionary<string, Update> latestValues = new Dictionary<string, Update>(); 

    public void AddUpdate(Update update) 
    { 
     latestValues[update.Id] = update; 
    } 

    public int CalculateSum() 
    { 
     return latestValues.Values.Select(v => v.Value).Sum(); 
    } 
} 

UPDATE:

这个问题的目的是为了简化真正的问题 - 也许我没有做一个好工作 - 对不起那个。 假设我有多个产生温度的IOT设备并定期报告此温度(更新流)。然后,不同的用户可以选择查看设备子集的聚合值(例如,平均值)。因此,一个客户可能希望看到设备1,2和3的平均值,而另一个客户可能希望看到设备2,3和4的平均值等等(聚合定义)。

+0

聚合组密钥总是等于“Id”,并且一条消息只能属于一个聚合组? – supertopi

+1

我们可以得到一个mcve吗? http://stackoverflow.com/help/mcve –

+0

你的意思是'10.000'在一万或十个小数点后面有三个零吗? – Enigmativity

回答

2

我想你在问什么您如何使用Rx创建实时阅读模型*。

鉴于我能猜到你的问题,我想你想能够更新一些当前状态,每个更新消息。在你的CalculateSum方法的情况下,你不能只汇总所有消息的Value属性,因为有些将被用来更新/覆盖现有值。

因此,考虑到这个假设,它看起来像GroupBy将是你的朋友。如果您首先将可观察的值序列分割为子序列,则可以分割并征服问题。

input.GroupBy(i=>i.Id) 

如果我们只考虑一个属于同一个Id的值流,那么每个值的总和应该是多少?

-1--1--2- 

在这种简单的情况下,答案总是只是直接传递的值。即

input -1--1--2- 
result -1--1--2- 

然而,当我们看两个序列生产值之成为稍硬计算

inputA -1-1-2-------- 
inputB --1-2-2-3-5-2- 
result -122344-5-7-4- 

在这里,我们需要看到的是什么三角洲是为序列中的每个值,并推动这一三角洲结果。这可以看作这种

inputA -1-1-2-------- 
delta -1-0-1-------- 

inputB --1-2-2-3-5-2- 
delta --1-1-0-1-2-(-3)- 

result -122344-5-7-4- 

造成这种三角投影的你可以写类似

input.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur, Delta = cur - acc.CurrentValue })) 
    .Select(acc => acc.Delta); 

把一起的代码看起来是这样的:

void Main() 
{ 
    var testScheduler = new TestScheduler(); 
    var input = testScheduler.CreateColdObservable<Update>(
     ReactiveTest.OnNext(010, new Update("a", 1)),  //1 
     ReactiveTest.OnNext(020, new Update("b", 1)),  //2 
     ReactiveTest.OnNext(030, new Update("c", 3)),  //5 
     ReactiveTest.OnNext(040, new Update("a", 1)),  //5 
     ReactiveTest.OnNext(050, new Update("b", 2)),  //6 
     ReactiveTest.OnNext(060, new Update("a", 2)),  //7 
     ReactiveTest.OnNext(070, new Update("b", 2)),  //7 
     ReactiveTest.OnNext(080, new Update("b", 3)),  //8 
     ReactiveTest.OnNext(090, new Update("b", 5)),  //10 
     ReactiveTest.OnNext(100, new Update("b", 2))  //7 

    ); 

    var currentSum = input.GroupBy(i => i.Id) 
     .SelectMany(grp => grp.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur.Value, Delta = cur.Value - acc.CurrentValue })) 
     .Select(acc => acc.Delta) 
     .Scan((acc, cur) => acc + cur); 

    var observer = testScheduler.CreateObserver<int>(); 
    var subscription = currentSum.Subscribe(observer); 
    testScheduler.Start(); 
    subscription.Dispose(); 

    ReactiveAssert.AreElementsEqual(new[] 
     { 
      ReactiveTest.OnNext(010, 1), 
      ReactiveTest.OnNext(020, 2), 
      ReactiveTest.OnNext(030, 5), 
      ReactiveTest.OnNext(040, 5), 
      ReactiveTest.OnNext(050, 6), 
      ReactiveTest.OnNext(060, 7), 
      ReactiveTest.OnNext(070, 7), 
      ReactiveTest.OnNext(080, 8), 
      ReactiveTest.OnNext(090, 10), 
      ReactiveTest.OnNext(100, 7)} 
     , 
     observer.Messages); 
} 

// Define other methods and classes here 
public struct Update 
{ 
    public Update(string id, int value) 
    { 
     Id = id; 
     Value = value; 
    } 
    public string Id { get; } 

    public int Value { get; } 
} 

如果你想创建多个聚合,那么每个新的聚合只是一个像上面这样的查询。您可以通过在分组之后共享/发布序列来优化目标,但我会首先确保这是分析所需的。

* CQRS/ES术语中的readmodels。

+0

谢谢你的详尽答案 - 作为一个奖励教给我一些关于TestScheduler的额外信息。我会试试这个! – BackendA