2016-07-11 17 views
1

我正在使用反应式编程来做一堆计算。这里是跟踪两个数和它们的和的简单示例:如何只发出一致的计算?

static void Main(string[] args) { 
    BehaviorSubject<int> x = new BehaviorSubject<int>(1); 
    BehaviorSubject<int> y = new BehaviorSubject<int>(2); 
    var sum = Observable.CombineLatest(x, y, (num1, num2) => num1 + num2); 
    Observable 
     .CombineLatest(x, y, sum, (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum }) 
     .Subscribe(i => Console.WriteLine($"X:{i.X} Y:{i.Y} Sum:{i.Sum}")); 
    x.OnNext(3); 
    Console.ReadLine(); 
} 

这产生以下输出:

X:1 Y:2 Sum:3 
X:3 Y:2 Sum:3 
X:3 Y:2 Sum:5 

通知第二输出结果如何为“不正确”,因为它表示的是3 + 2 = 3。我明白为什么会发生这种情况(x在更新之前更新),但我希望我的输出计算是原子性/一致的 - 在所有相关计算完成之前不应发射任何值。我的第一种方法是...

Observable.When(sum.And(Observable.CombineLatest(x, y)).Then((s, xy) => new { Sum = s, X = xy[0], Y = xy[1] })); 

这似乎适用于我的简单示例。但我的实际代码有很多计算值,我无法弄清楚如何调整它。例如,如果有一个和平和平衡,我不知道如何在采取行动之前等待其中的每一个发射一些东西。

应该工作的一种方法(in-theory)是为所有我关心的值添加时间戳,如下所示。

Observable 
    .CombineLatest(x.Timestamp(), y.Timestamp(), sum.Timestamp(), (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum }) 
    .Where(i=>i.Sum.Timestamp>i.X.Timestamp && i.Sum.Timestamp>i.Y.Timestamp) 
    // do the calculation and subscribe 

该方法适用于非常复杂的模型。我所要做的就是确保不会发出比任何核心数据值都早的计算值。我觉得这是一个混乱。它并没有在我的控制台应用程序中实际工作。当我用自定义扩展替换Timestamp时,它指定了顺序int64,它确实有效。

什么是简单,干净的方式来处理这种事情?

=======

我在这里取得了一些进展。在等待触发计算的数据值之前,等待sum和sumSquared发出一个值。

var all = Observable.When(sum.And(sumSquared).And(Observable.CombineLatest(x, y)).Then((s, q, data) 
    => new { Sum = s, SumSquared = q, X = data[0], Y = data[1] })); 

回答

1

这应该做你想要什么:

Observable.CombineLatest(x, y, sum) 
    .DistinctUntilChanged(list => list[2]) 
    .Subscribe(list => Console.WriteLine("{0}+{1}={2}", list[0], list[1], list[2])); 

它等待,直到总和已经更新,这意味着它的所有来源必须已经过更新。

+0

这很简单。我仍然对这些方法感到困扰。当我创建它们并针对这些依赖性定制我的查询时,它们要求我定义可观察对象之间的依赖关系。在Excel中,您只需更改一个单元格,所有内容都会更新;没有中间的无效信息看。我希望有一种方法可以做到这一点,比如Observable.CombineLatest(bool waitForSynchronousUpdatesToComplete)。 – JustinM

+0

假设IObservable 代表电子邮件地址,IObservable 指示电子邮件地址是否格式正确。如果你想同时显示两条信息,你不应该等待DistinctUntilChanged,因为验证结果可能没有改变。在这种情况下,也许最好是使用Zip关联输入和匹配输出,并发出重复的验证消息。再次 - 使用IObservable的东西需要太多关于如何定义函数的知识。应该有更好的方式来使它像Excel一样工作。 – JustinM

+0

您目前的设置是并行流,这有很多优点,但这意味着流之间没有排序。 –

0

您的问题不是因为x更新前总和更新本身。这关乎你构建查询的方式。

您已经有效地创建了两个查询:Observable.CombineLatest(x, y, (num1, num2) => num1 + num2) & Observable.CombineLatest(x, y, sum, (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })。由于在每个订阅x中,您已经创建了两个订阅。这意味着当更新x时会发生两次更新。

您需要避免创建两个订阅。

如果你写你这样的代码:

BehaviorSubject<int> x = new BehaviorSubject<int>(1); 
BehaviorSubject<int> y = new BehaviorSubject<int>(2); 

Observable 
    .CombineLatest(x, y, (num1, num2) => new 
    { 
     X = num1, 
     Y = num2, 
     Sum = num1 + num2 
    }) 
    .Subscribe(i => Console.WriteLine($"X:{i.X} Y:{i.Y} Sum:{i.Sum}")); 

x.OnNext(3); 

...那么你正确地得到这个输出:

 
X:1 Y:2 Sum:3 
X:3 Y:2 Sum:5 
+0

我正在尝试构建一些简单的小函数,然后使用CombineLatest等运算符将它们组合在一起以构建更复杂的函数。我得到答案,但不知道他们是什么答案,所以输出“x:1 y:2 sum:3”很难。您的解决方案要求每个函数都发出结果和输入参数。这使得关联输入和输出变得容易,但对于基于几层其他功能的功能,它不能很好地扩展。 – JustinM

+0

@JustinM - 然后你需要尝试在你的查询中使用'.Zip'和'.Publish'。使用'.CombineLatest'就是为什么你的查询不起作用。也许你可以发布一个更复杂的例子来说明你实际上想要做什么,并且我可以做一些事情? – Enigmativity

+0

好吧我会这样做,不知道为什么需要发布 – JustinM

0

我已经开始让我的头多了一些。这是我想要完成的更详细的例子。这是一些验证名和姓的代码,并且只有在两个部分都有效时才能生成全名。正如你所看到的,我正尝试使用一些小的独立定义的函数,比如“firstIsValid”,然后将它们组合在一起来计算更复杂的东西。

看来我在这里面临的挑战是试图关联我的函数中的输入和输出。例如,“firstIsValid”会生成一个输出,表示某个名字是有效的,但不会告诉您哪个名字是有效的。在下面的选项2中,我可以使用Zip关联它们。

如果验证功能不会为每个输入生成一个输出,则此策略将不起作用。例如,如果用户正在输入网址,并且我们正在尝试在网络上验证它们,也许我们会做一个节流阀和/或开关。一个“webAddressIsValid”可能有10个网址。在这种情况下,我认为我必须在输入中包含输出。也许有一个IObservable>其中字符串是Web地址,布尔是它是否有效。

static void Main(string[] args) { 
    var first = new BehaviorSubject<string>(null); 
    var last = new BehaviorSubject<string>(null); 
    var firstIsValid = first.Select(i => string.IsNullOrEmpty(i) || i.Length < 3 ? false : true); 
    var lastIsValid = last.Select(i => string.IsNullOrEmpty(i) || i.Length < 3 ? false : true); 

    // OPTION 1 : Does not work 
    // Output: bob smith, bob, bob roberts, roberts 
    // firstIsValid and lastIsValid are not in sync with first and last 
    //var whole = Observable 
    // .CombineLatest(first, firstIsValid, last, lastIsValid, (f, fv, l, lv) => new { 
    //  First = f, 
    //  Last = l, 
    //  FirstIsValid = fv, 
    //  LastIsValid = lv 
    // }) 
    // .Where(i => i.FirstIsValid && i.LastIsValid) 
    // .Select(i => $"{i.First} {i.Last}"); 

    // OPTION 2 : Works as long as every change in a core data value generates one calculated value 
    // Output: bob smith, bob robert 
    var firstValidity = Observable.Zip(first, firstIsValid, (f, fv) => new { Name = f, IsValid = fv }); 
    var lastValidity = Observable.Zip(last, lastIsValid, (l, lv) => new { Name = l, IsValid = lv }); 
    var whole = 
     Observable.CombineLatest(firstValidity, lastValidity, (f, l) => new { First = f, Last = l }) 
     .Where(i => i.First.IsValid && i.Last.IsValid) 
     .Select(i => $"{i.First.Name} {i.Last.Name}"); 

    whole.Subscribe(i => Console.WriteLine(i)); 

    first.OnNext("bob"); 
    last.OnNext("smith"); 
    last.OnNext(null); 
    last.OnNext("roberts"); 
    first.OnNext(null); 

    Console.ReadLine(); 
} 

这里的另一种方法。每个值都有一个版本号(如时间戳)。任何时候计算的值比数据(或其它依赖的计算值)都要旧,我们可以忽略它。

public class VersionedValue { 
    static long _version; 
    public VersionedValue() { Version = Interlocked.Increment(ref _version); } 
    public long Version { get; } 
} 

public class VersionedValue<T> : VersionedValue { 
    public VersionedValue(T value) { Value = value; } 
    public T Value { get; } 
    public override string ToString() => $"{Value} {Version}"; 
} 

public static class ExtensionMethods { 
    public static IObservable<VersionedValue<T>> Versioned<T>(this IObservable<T> values) => values.Select(i => new VersionedValue<T>(i)); 
    public static VersionedValue<T> AsVersionedValue<T>(this T obj) => new VersionedValue<T>(obj); 
} 

static void Main(string[] args) { 
    // same as before 
    // 
    var whole = Observable 
     .CombineLatest(first.Versioned(), firstIsValid.Versioned(), last.Versioned(), lastIsValid.Versioned(), (f, fv, l, lv) => new { 
      First = f, 
      Last = l, 
      FirstIsValid = fv, 
      LastIsValid = lv 
     }) 
     .Where(i => i.FirstIsValid.Version > i.First.Version && i.LastIsValid.Version > i.Last.Version) 
     .Where(i => i.FirstIsValid.Value && i.LastIsValid.Value) 
     .Select(i => $"{i.First.Value} {i.Last.Value}"); 
+0

对于确定性顺序中发生了什么,我还是有点困惑,什么不是。我已经问过类似的问题,但可能更清晰的问题在这里:http://stackoverflow.com/questions/38364746/how-to-correlate-function-inputs-and-outputs – JustinM