1
我正在测试Reactive Extensions(NuGet的主分支),我在合并时遇到了一些问题。我并行运行多个操作,并希望在所有操作完成时收到通知,但我只是没有得到它。合并和onCompleted - 如何在所有并行操作完成时获取通知?
这里是我的操作,我使用Web客户端下载一个网页,然后计算字数:
private IObservable<int> GetWebsiteWordCount(Uri uri)
{
var client = new WebClient();
var o = Observable.FromEventPattern<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted")
.ObserveOn(Scheduler.ThreadPool)
.Select(newString => newString.EventArgs.Result.Split(' ').Length);
client.DownloadStringAsync(uri);
return o;
}
我然后创建很多这样的:
var tasks = new List<IObservable<int>>()
{
GetWebsiteWordCount(new Uri("http://www.google.com", UriKind.Absolute)),
GetWebsiteWordCount(new Uri("http://www.bing.com", UriKind.Absolute)),
GetWebsiteWordCount(new Uri("http://www.yle.fi", UriKind.Absolute))
};
之后,我用的是合并组合这些并尝试在全部完成时收到通知:
tasks.Merge()
.ObserveOn(SynchronizationContext.Current)
.Subscribe(x => Debug.WriteLine(x), ex => Debug.WriteLine("exception thrown"),
() => Debug.WriteLine("all ready"));
所有这些“任务”是正确执行和我在调试窗口的字数预期:
14279
672
292
但我不明白“一切准备就绪”的消息。任何想法我失踪?
更新:使用的合并而不是
总和我也试图合并改成这样:
var result = from i in tasks.ToObservable()
from r in i
select r;
result.Sum().Subscribe(x => Debug.WriteLine("all ready. sum: " + x));
但我从来没有得到结果返回。
更新:有固定的问题采取
感谢基甸Engelberth现在合并与和选项都工作。该解决方案是通过增加取(1)固定GetWebsiteWordCount法:
private IObservable<int> GetWebsiteWordCount(Uri uri)
{
var client = new WebClient();
var o = Observable.FromEventPattern<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted")
.ObserveOn(Scheduler.ThreadPool)
.Select(newString => newString.EventArgs.Result.Split(' ').Length)
.Take(1);
client.DownloadStringAsync(uri);
return o;
}
的某处添加一个
.Take(1)
谢谢!现在它工作:) –