2014-07-10 29 views
2

在我的Rx代码中,我使用GroupBy创建IObservable<IGroupedObservable<T>>。从那里我希望对IGroupedObservable进行一些转换,同时保持关于组的知识(Key)。使用和维护IGroupedObservable

例如,

IObservable<IGroupedObservable<T>> CreateGroup(this IObservable<T> obs) 
{ 
    return obs.GroupBy(o => o.Something); 
} 

IGroupedObservable<A> Foo(this IGroupedObservable<T> obs) 
{ 
    return obs.Select(o => new A(o)); 
} 

IGroupedObservable<B> Bar(this IGroupedObservable<A> obs) 
{ 
    return obs.Select(o => new B(o)); 
} 

IObservable<IGroupedObservable<B>> stillGrouped = initialObservable.CreateGroup().Select(grouped => grouped.Foo().Bar()); 

当然,我不能这样做,因为在富的Select()失去IGroupedObservable的烦躁。

有没有人看到过这个解决方案?

编辑TLDR我试图撰写依赖于可观察已被分组操作,和我喜欢的类型系统来执行这对我来说。我想我可以让这些操作采用组Key,并在前面执行一个Where Key,但是这样会变得越来越混乱。

回答

2

嗯。

一个蛮力的方法是复制你需要的observable方法。像这样:

private class GroupedObservable<TKey, TElement> : IGroupedObservable<TKey, TElement> 
{ 
    private readonly IObservable<TElement> _o; 
    private readonly TKey _k; 

    public TKey Key { get { return _k } } 

    public GroupedObservable(TKey key, IObservable<TElement> o) 
    { 
     _key = key; 
     _o = ; 
    } 

    public IDisposable Subscribe(IObserver<TElement> observer) { return _o.Subscribe(observer); } 
} 

public static IGroupedObservable<TKey, TResult> Select<TKey, TSource, TResult>(this IGroupedObservable<TKey, TSource> source, Func<TSource, TResult> selector) 
{ 
    return new GroupedObservable<TKey, TResult>(source.Key, ((IObservable<TSource>)source).Select(selector)); 
} 

随着时间的推移,你会建立一个你需要的方法库。

另一种简单的方法是只使用Select改变你IGroupedObservable<TKey, TElement>IObservable<KeyValuePair<TKey, TElement>>

public static IObservable<KeyValuePair<TKey, TValue>> ToKV<TKey, TValue>(this IGroupedObservable<TKey, TValue> g) 
{ 
    return g.Select(v => new KeyValuePair<TKey, TValue>(g.Key, v)); 
} 

initialObservable.CreateGroup().Select(group => group.ToKV()); 
+0

是啊,这第一个选择可能就是我会去。我希望[rxx](http://rxx.codeplex.com/)或其他一些库可以创建这些扩展:(。 – Anthony