2013-11-20 115 views
0

这里我们有一个Observable Sequence ...在.NET中使用Rx。合并观察对象

var aSource = new Subject<int>(); 

var bSource = new Subject<int>(); 

var paired = Observable 
      .Merge(aSource, bSource) 
    .GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1)); 

paired.Subscribe(g => Console.WriteLine("{0}:{1}", g.ElementAt(0), g.ElementAt(1))); 

aSource.OnNext(4); 
bSource.OnNext(1); 
aSource.OnNext(2); 
bSource.OnNext(5); 
aSource.OnNext(3); 
bSource.OnNext(3); 
aSource.OnNext(5); 
bSource.OnNext(2); 
aSource.OnNext(1); 
bSource.OnNext(4); 

输出: 3:3 5:5 2:2 1:1 4:4

我们会得到每一个事件,一对数字具有相同的ID到达时间。

完美!正是我想要的。

一组两个,按价值配对。

下一个问题....

如何获得价值序列的的SelectMany /缓冲。

因此1,2,3,4,5通过OnNext()到达aSource和bSource。然后启动1-5的FireWireLine()。然后当2,3,4,5,6到达时,我们得到另一个console.writeline()。任何线索任何人?

随即,在Rx论坛建议看.Window()

http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

这表面上看起来很完美。就我而言,在这种情况下,我需要一个价值4的窗口。

在查询序列中它属于获取此效果的位置? (a),bSource).GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1));

输出 1,2,3,4,5:1,2,3,4,5 2,3,4,5,6 :2,3,4,5,6

此致,

丹尼尔

+0

对于第二部分,数字按顺序到达每个源?或随机顺序? –

+0

它可以是随机的。它们是不同长度“长”过程的结果。 – WebSight

回答

1

假设事件的来源随机到达,用我的回答"Reordering events with Reactive Extensions"得到事件秩序。

然后使用Observable.Buffer来创建滑动缓冲区:

// get this using the OrderedCollect/Sort in the referenced question 
IObservable<int> orderedSource; 

// then subscribe to this 
orderedSource.Buffer(5, 1); 
+0

谢谢,今晚我回到家时,我会试试这个。我假设根据上面的编辑,Window()是针对有序序列的。 – WebSight

+0

正确,'Window'为您提供一串流,'Buffer'为列表流。值得注意的是,你只在关闭时才得到缓冲区,但窗口流开始立即发送。 –

+0

对,看这个空间....我会在这里发布结果与任何进一步的问题。再次感谢詹姆斯。 – WebSight

0

下面是一个扩展方法,当它有n个相同的ID的输入火灾。

public static class RxExtension 
    { 

     public static IObservable<TSource> MergeBuffer<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector, Func<IList<TSource>,TSource> mergeFunction, int bufferCount) 
     { 
      return Observable.Create<TSource>(o => { 
       var buffer = new Dictionary<int, IList<TSource>>(); 
       return source.Subscribe<TSource>(i => 
       { 
        var index = keySelector(i); 
        if (buffer.ContainsKey(index)) 
        { 
         buffer[index].Add(i); 
        } 
        else 
        { 
         buffer.Add(index, new List<TSource>(){i}); 
        } 
        if (buffer.Count==bufferCount) 
        { 
         o.OnNext(mergeFunction(buffer[index])); 
         buffer.Remove(index); 
        } 
       }); 
      }); 
     } 
    } 

调用扩展。

mainInput = Observable.Merge(inputNodes.ToArray()).MergeBuffer<NodeData>(x => x.id, x => MergeData(x), 1);