2012-01-19 33 views
1

我正在测试Reactive Extensions(NuGet的主分支),我在合并时遇到了一些问题。我并行运行多个操作,并希望在所有操作完成时收到通知,但我只是没有得到它。合并和onCompleted - 如何在所有并行操作完成时获取通知?

这里是我的操作,我使用Web客户端下载一个网页,然后计算字数:

private IObservable<int> GetWebsiteWordCount(Uri uri) 
    { 
     var client = new WebClient(); 

     var o = Observable.FromEventPattern<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted") 
        .ObserveOn(Scheduler.ThreadPool) 
        .Select(newString => newString.EventArgs.Result.Split(' ').Length); 

     client.DownloadStringAsync(uri); 

     return o; 
    } 

我然后创建很多这样的:

 var tasks = new List<IObservable<int>>() 
         { 
          GetWebsiteWordCount(new Uri("http://www.google.com", UriKind.Absolute)), 
          GetWebsiteWordCount(new Uri("http://www.bing.com", UriKind.Absolute)), 
          GetWebsiteWordCount(new Uri("http://www.yle.fi", UriKind.Absolute)) 
         }; 

之后,我用的是合并组合这些并尝试在全部完成时收到通知:

 tasks.Merge() 
      .ObserveOn(SynchronizationContext.Current) 
      .Subscribe(x => Debug.WriteLine(x), ex => Debug.WriteLine("exception thrown"), 
         () => Debug.WriteLine("all ready")); 

所有这些“任务”是正确执行和我在调试窗口的字数预期:

14279 
672 
292 

但我不明白“一切准备就绪”的消息。任何想法我失踪?

更新:使用的合并而不是

总和我也试图合并改成这样:

 var result = from i in tasks.ToObservable() 
        from r in i 
        select r; 

     result.Sum().Subscribe(x => Debug.WriteLine("all ready. sum: " + x)); 

但我从来没有得到结果返回。

更新:有固定的问题采取

感谢基甸Engelberth现在合并与和选项都工作。该解决方案是通过增加取(1)固定GetWebsiteWordCount法:

private IObservable<int> GetWebsiteWordCount(Uri uri) 
    { 
     var client = new WebClient(); 

     var o = Observable.FromEventPattern<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted") 
        .ObserveOn(Scheduler.ThreadPool) 
        .Select(newString => newString.EventArgs.Result.Split(' ').Length) 
        .Take(1); 

     client.DownloadStringAsync(uri); 

     return o; 
    } 

回答

2

Observable.FromEventPattern无法完成,因为它没有办法知道什么时候会有没有更多的事件。因为你知道这个特定事件是事件异步模式,所以它应该只触发一次。告诉这个可观察的,在GetWebsiteWordCount

+0

的某处添加一个.Take(1)谢谢!现在它工作:) –

相关问题