我不能承认Richard Szalay的解决方案是可以接受的。如果您启动100个请求,并且第二个请求由于服务器不可用错误而失败(例如),则剩余的98个请求将中止。第二个问题是UI如何对这种可观察性做出反应?太伤心。
请牢记章节4.3的Rx Design Guidelines我希望通过Observable.Using()运算符表示WebRequest observable。但WebRequest不是一次性的!所以我定义DisposableWebRequest:
public class DisposableWebRequest : WebRequest, IDisposable
{
private static int _Counter = 0;
private readonly WebRequest _request;
private readonly int _index;
private volatile bool _disposed = false;
public DisposableWebRequest (string url)
{
this._request = WebRequest.Create(url);
this._index = ++DisposableWebRequest._Counter;
}
public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state)
{
return this._request.BeginGetResponse(callback, state);
}
public override WebResponse EndGetResponse(IAsyncResult asyncResult)
{
Trace.WriteLine(string.Format("EndGetResponse index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId));
Trace.Flush();
if (!this._disposed)
{
return this._request.EndGetResponse(asyncResult);
}
else
{
return null;
}
}
public override WebResponse GetResponse()
{
return this._request.GetResponse();
}
public override void Abort()
{
this._request.Abort();
}
public void Dispose()
{
if(!this._disposed)
{
this._disposed = true;
Trace.WriteLine(string.Format("Disposed index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId));
Trace.Flush();
this.Abort();
}
}
}
然后,我创建WPF窗口,并把两个按钮就可以了(启动和停止)。
因此,让我们创建合适的请求可观察集合。 首先,定义URL的可观察到的创建功能:
Func<IObservable<string>> createUrlObservable =() =>
Observable
.Return("http://yahoo.com")
.Repeat(100)
.OnStartup(() =>
{
this._failed = 0;
this._successed = 0;
});
在每一个URL,我们应该创建WebRequest的obervable,所以:
Func<string, IObservable<WebResponse>> createRequestObservable =
url =>
Observable.Using(
() => new DisposableWebRequest("http://yahoo.com"),
r =>
{
Trace.WriteLine("Queued " + url);
Trace.Flush();
return Observable
.FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)();
});
另外定义两个事件可观察其反应上的按钮“开始” /”停止”点击:
var startMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StartButton, "Click");
var stopMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StopButton, "Click");
所以砖是准备好了,时间到撰写他们(我这样做是鉴于构造刚过的InitializeComponent()):
startMouseDown
.SelectMany(down =>
createUrlObservable()
.SelectMany(url => createRequestObservable(url)
.TakeUntil(stopMouseDown)
.Select(r => r.GetResponseStream())
.Do(s =>
{
using (var sr = new StreamReader(s))
{
Trace.WriteLine(sr.ReadLine());
Trace.Flush();
}
})
.Do(r => this._successed++)
.HandleError(e => this._failed++))
.ObserveOnDispatcher()
.Do(_ => this.RefresLabels(),
e => { },
() => this.RefresLabels())
)
.Subscribe();
您可能想知道函数“HandleError()”。如果在EndGetResponse()调用中发生异常(我关闭了网络连接来重现它),并且在请求observable中没有捕获 - 它会使startMouseDown observable崩溃。的HandleError捕获异常默默地,perfom提供行动和替代呼叫的OnError明年观测它调用OnCompleted:
public static class ObservableExtensions
{
public static IObservable<TSource> HandleError<TSource>(this IObservable<TSource> source, Action<Exception> errorHandler)
{
return Observable.CreateWithDisposable<TSource>(observer =>
{
return source.Subscribe(observer.OnNext,
e =>
{
errorHandler(e);
//observer.OnError(e);
observer.OnCompleted();
},
observer.OnCompleted);
});
}
}
最后莫名其妙的地方是方法RefreshLabels,这将更新UI控件:
private void RefresLabels()
{
this.SuccessedLabel.Content = string.Format("Successed {0}", this._successed);
this.FailedLabel.Content = string.Format("Failed {0}", this._failed);
}
你的代码似乎不正确:在`IObservable <>`上没有可调用的FromAsyncPattern扩展方法。该方法只是静态的。不扩展。如果你花时间想出真正的代码,问题和解决方案将变得明显。 – 2010-12-24 04:47:20