2011-06-03 42 views
0

我有麻烦服用无扩展主题为<IEnumerable的<Obj>>以主题为<IEnumerable的<AggregatedObj >>

public Subject<IEnumerable<Person>> PersonDataSubject; 

对象并将其转换为:

public Subject<IEnumerable<BornInYear>> BornInYearSubject; 

...使用一些linq聚合。

下面的例子将它放在更多的上下文中,我挣扎的地方在于如何从PersonDataSubject的订阅中获取一个IEnumerable到BornInYearSubject中。

无论我尝试什么,我最终得到IObservable<BornInYear>,而不是IObservable<IEnumerable<BornInYear>>

目标是让该类的客户能够订阅两个主题,并在每个“下一个”通知中获得相应类型的IEnumerable。

public class ReactiveTest 
{ 
    public class Person 
    { 
     public string name; 
     public DateTime dob; 
    }; 

    public class BornInYear 
    { 
     public int Year; 
     public int Count; 
    } 

    public Subject<IEnumerable<Person>> PersonDataSubject = new Subject<IEnumerable<Person>>(); 
    public Subject<IEnumerable<BornInYear>> BornInYearSubject= new Subject<IEnumerable<BornInYear>>(); 

    public void LoadData() 
    { 
     // Go to hypotheritical web service and get batch of people. 
     IEnumerable<Person> people = WebService.Fetch(); 

     // Notify subscribers we have a fresh batch of data. 
     PersonDataSubject.OnNext(people); 
    } 

    public ReactiveTest() 
    { 
     // Hookup BornInYearSubject to listen to PersonDataSubject and publish the summarised data. 
     PersonDataSubject.Subscribe(pd => pd.GroupBy(p => p.dob.Year) 
              .Select(ps => new BornInYear { Year = ps.Key, Count = ps.Count()}) 
              .AsParallel() 
      ); 

     // How do I get the results of this out and published onto BornInYearSubject? 
    } 
} 

现在我知道我可以使用Task.Factory.StartNew(...)...为实现这个我订阅OnNext为PersonDataSubject但我相信它必须能够保持更多的反应?

+0

这有什么错用的IObservable >? – 2011-06-03 17:20:02

回答

0

好这部作品。感谢这些创意家伙 - 事后回想起来似乎非常明显!

using System; 
using System.Collections.Generic; 
using System.Linq; 

namespace TestReactive 
{ 
    public class ReactiveTest 
    { 
     public class Person 
     { 
      public string name; 
      public DateTime dob; 
     }; 

     public class BornInYear 
     { 
      public int Year; 
      public int Count; 
     } 

     public Subject<IEnumerable<Person>> PersonDataSubject = new Subject<IEnumerable<Person>>(); 
     public Subject<IEnumerable<BornInYear>> BornInYearSubject = new Subject<IEnumerable<BornInYear>>(); 

     public void LoadData() 
     { 
      IEnumerable<Person> people = new List<Person> { 
       new Person() {name = "Bill", dob = DateTime.Now.AddYears(-10)}, 
       new Person() {name = "Pete", dob = DateTime.Now.AddYears(-5)}, 
       new Person() {name = "Judy", dob = DateTime.Now.AddYears(-1)}, 
       new Person() {name = "Mike", dob = DateTime.Now.AddYears(-5)}, 
       new Person() {name = "Jake", dob = DateTime.Now.AddYears(-5)}, 
       new Person() {name = "Fred", dob = DateTime.Now.AddYears(-13)}, 
      }; 

      // Notify subscribers we have a fresh batch of data. 
      PersonDataSubject.OnNext(people); 
     } 

     public ReactiveTest() 
     { 
      var subj = PersonDataSubject.Select(pds => pds.GroupBy(pd => pd.dob.Year) 
                  .Select(p => new BornInYear { 
                   Year = p.Key, Count = p.Count() 
                  }).AsParallel()); 
      subj.Subscribe(BornInYearSubject); 

      BornInYearSubject.Subscribe(x=> Console.WriteLine("{0}", x.Count())); 
      LoadData(); 
     } 
    } 

    class Program 
    { 
     static void Main(string[] args) 
     { 
      ReactiveTest rt = new ReactiveTest(); 
     } 
    } 
} 
1

如何:

PersonDataSubject 
    .GroupBy(x => x.Dob.Year) 
    .Select(x => x.Aggregate(new List<BornInYear>(), (acc, x) => { acc.Add(new BornInYear { Year = ps.Key }); return acc; })) 
+0

会聚合返回什么w/o OnCompleted? – 2011-06-03 19:02:48

+0

@Scott你说得对,我假设PersonDataSubject最终会完成。如果不是,则用'扫描'替换'Aggregate' – 2011-06-03 20:13:13

+0

这不会起作用,因为PersonDataSubject是IEnumerable 因此您不能直接调用GroupBy,它是需要分组的IEnumerable的内容。给了我一些想法,尽管如此还会继续发挥。谢谢。 – DanH 2011-06-06 11:04:09

相关问题