2013-06-11 62 views
6

我试图使用Reactive Extensions(Rx)来缓存任务完成时的枚举。有谁知道是否有一个干净的内置方式做到这一点? ToObservable扩展方法将只是一个IObservable<Task<T>>,这不是我想要的,我想要一个IObservable<T>,然后我可以使用Buffer上。转换IEnumerable <Task<T>>到IObservable <T>

人为的例子:

//Method designed to be awaitable 
public static Task<int> makeInt() 
{ 
    return Task.Run(() => 5); 
} 

//In practice, however, I don't want to await each individual task 
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer 
public static void Main() 
{ 
    //Make a bunch of tasks 
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt()); 

    //Is there a built in way to turn this into an Observable that I can then buffer? 
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //???? 

    buffered.Subscribe(ints => { 
     Console.WriteLine(ints.Count()); //Should be 15 
    }); 
} 
+0

http://stackoverflow.com/questions/13500456/how-to-convert-an-ienumerabletaskt-to-iobservablet –

回答

7

您可以使用一个事实,即Task可以转换为使用another overload of ToObservable()观察到。

当您有一个(单项)观察值的集合时,您可以创建一个包含项目的单个观察值,因为它们使用Merge()完成。

所以,你的代码看起来是这样的:

futureInts.Select(t => t.ToObservable()) 
      .Merge() 
      .Buffer(15) 
      .Subscribe(ints => Console.WriteLine(ints.Count)); 
相关问题