2016-02-11 45 views
1

我已经对反应性linq(也称为Rx.NET)做了一些研究。我已经看到SelectMany方法的行为如下最终结果:展平observable中的内容,使其从多个流转到单个流。我的问题是,是否有一种方法在完全相反的方向上行事?这意味着在将多个事件放入一个可观察的事件(multiplexing)后,如何实现这一点?我知道GroupBy与我请求的行为非常相似,除了一件事情不存在:我想要一个可能还没有发布的密钥的可观察对象。Rx.net中SelectMany的相反结果

我正在考虑自己实施行为,但如果有机会,我错过了这种方法的存在,请告诉我我错了,它确实存在!

+0

我在想'GroupBy'或'Where'会做诡计。你能张贴代码说明如何/为什么这些不足? – Shlomo

+0

使用一个简单的'.Where'是对这个最明显的答案,因为分组和投影一个还没有出现的组是一样的,只是做'.Where'。你能详细说明吗? – Enigmativity

回答

2

你是对的GroupBy是你想要的。设置它可以很容易地提前订阅任何密钥。只是这样做:

IObservable<int> oddNumbers = 
    Observable 
     .Range(0, 10) 
     .GroupBy(x => x % 2) 
     .Where(gx => gx.Key == 1) 
     .Merge(); 

如果我同意,我得到:

 
1 
3 
5 
7 
9 

然而,这一点的时候,因为这是浪费直接等价的:

IObservable<int> oddNumbers = 
    Observable 
     .Range(0, 10) 
     .Where(x => x % 2 == 1); 

我可以看到它可能有用的唯一方法是如果你这样做:

IConnectableObservable<IGroupedObservable<int, int>> groupedNumbers = 
    Observable 
     .Range(0, 10) 
     .GroupBy(x => x % 2) 
     .Publish(); 

Func<IConnectableObservable<IGroupedObservable<int, int>>, int, IObservable<int>> anyProject = 
    (source, key) => 
     source 
      .Where(gx => gx.Key == key) 
      .Merge(); 

IObservable<int> oddNumbers = anyProject(groupedNumbers, 1); 

oddNumbers.Subscribe(x => Console.WriteLine(x)); 

groupedNumbers.Connect(); 

这仍然给我奇数,但我现在可以创建偶数可观察而无需开始新的分组。

3

GroupBy是你正在寻找的东西,但正如你指出的那样,它在预先创建“已知”群体方面确实不足。不过我想你可能是创意和种子与价值观的顺序为每个这些已知的组这样的:

如果来源是这样

Observable.Interval(TimeSpan.FromMilliseconds(100)) 
    .GroupBy(i => i % 4) 

您可以修改,要成为像

var seed = new[] { 0L, 1L, 2L, 3L }.ToObservable(); 
Observable.Concat(seed, Observable.Interval(TimeSpan.FromMilliseconds(100))) 
    .GroupBy(i => i % 4) 
    .Select(grp=>grp.Skip(1)) //Ignore the first/seed value.