我想你在问什么您如何使用Rx创建实时阅读模型*。
鉴于我能猜到你的问题,我想你想能够更新一些当前状态,每个更新消息。在你的CalculateSum
方法的情况下,你不能只汇总所有消息的Value
属性,因为有些将被用来更新/覆盖现有值。
因此,考虑到这个假设,它看起来像GroupBy
将是你的朋友。如果您首先将可观察的值序列分割为子序列,则可以分割并征服问题。
input.GroupBy(i=>i.Id)
如果我们只考虑一个属于同一个Id的值流,那么每个值的总和应该是多少?
-1--1--2-
在这种简单的情况下,答案总是只是直接传递的值。即
input -1--1--2-
result -1--1--2-
然而,当我们看两个序列生产值之成为稍硬计算
inputA -1-1-2--------
inputB --1-2-2-3-5-2-
result -122344-5-7-4-
在这里,我们需要看到的是什么三角洲是为序列中的每个值,并推动这一三角洲结果。这可以看作这种
inputA -1-1-2--------
delta -1-0-1--------
inputB --1-2-2-3-5-2-
delta --1-1-0-1-2-(-3)-
result -122344-5-7-4-
造成这种三角投影的你可以写类似
input.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur, Delta = cur - acc.CurrentValue }))
.Select(acc => acc.Delta);
把一起的代码看起来是这样的:
void Main()
{
var testScheduler = new TestScheduler();
var input = testScheduler.CreateColdObservable<Update>(
ReactiveTest.OnNext(010, new Update("a", 1)), //1
ReactiveTest.OnNext(020, new Update("b", 1)), //2
ReactiveTest.OnNext(030, new Update("c", 3)), //5
ReactiveTest.OnNext(040, new Update("a", 1)), //5
ReactiveTest.OnNext(050, new Update("b", 2)), //6
ReactiveTest.OnNext(060, new Update("a", 2)), //7
ReactiveTest.OnNext(070, new Update("b", 2)), //7
ReactiveTest.OnNext(080, new Update("b", 3)), //8
ReactiveTest.OnNext(090, new Update("b", 5)), //10
ReactiveTest.OnNext(100, new Update("b", 2)) //7
);
var currentSum = input.GroupBy(i => i.Id)
.SelectMany(grp => grp.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur.Value, Delta = cur.Value - acc.CurrentValue }))
.Select(acc => acc.Delta)
.Scan((acc, cur) => acc + cur);
var observer = testScheduler.CreateObserver<int>();
var subscription = currentSum.Subscribe(observer);
testScheduler.Start();
subscription.Dispose();
ReactiveAssert.AreElementsEqual(new[]
{
ReactiveTest.OnNext(010, 1),
ReactiveTest.OnNext(020, 2),
ReactiveTest.OnNext(030, 5),
ReactiveTest.OnNext(040, 5),
ReactiveTest.OnNext(050, 6),
ReactiveTest.OnNext(060, 7),
ReactiveTest.OnNext(070, 7),
ReactiveTest.OnNext(080, 8),
ReactiveTest.OnNext(090, 10),
ReactiveTest.OnNext(100, 7)}
,
observer.Messages);
}
// Define other methods and classes here
public struct Update
{
public Update(string id, int value)
{
Id = id;
Value = value;
}
public string Id { get; }
public int Value { get; }
}
如果你想创建多个聚合,那么每个新的聚合只是一个像上面这样的查询。您可以通过在分组之后共享/发布序列来优化目标,但我会首先确保这是分析所需的。
* CQRS/ES术语中的readmodels。
聚合组密钥总是等于“Id”,并且一条消息只能属于一个聚合组? – supertopi
我们可以得到一个mcve吗? http://stackoverflow.com/help/mcve –
你的意思是'10.000'在一万或十个小数点后面有三个零吗? – Enigmativity